fs2.Stream observeAsync does not execute a given sink asynchronously










0















I'm experimenting with fs2.Stream concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:



object TestParallelStream extends App 
val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO
println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
Thread.sleep(5000)
)
val executor = Executors.newFixedThreadPool(4)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))


stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
executor.shutdown()



The //1 prints the following content:



[1 second]: 1
[6 second]: 2
[11 second]: 3
[16 second]: 4
[21 second]: 5
[26 second]: 6
[31 second]: 7
[36 second]: 8
[41 second]: 9


As can be seen from the output, each element is sent through the sink sequentially.



But if I modify the sink as follows:



// 5 limit and parEvalMap
val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO
println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
Thread.sleep(5000)
)


The output is:



[1 second]: 3
[1 second]: 2
[1 second]: 4
[1 second]: 1
[6 second]: 5
[6 second]: 6
[6 second]: 7
[6 second]: 8
[11 second]: 9


Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3 as a limit of observerAsync).



Even if I replace observerAsync with just observe I got the same parallelization effect.



Can you please clarify how sinks actually work?










share|improve this question


























    0















    I'm experimenting with fs2.Stream concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:



    object TestParallelStream extends App 
    val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
    val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
    val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO
    println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
    Thread.sleep(5000)
    )
    val executor = Executors.newFixedThreadPool(4)
    implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))


    stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
    executor.shutdown()



    The //1 prints the following content:



    [1 second]: 1
    [6 second]: 2
    [11 second]: 3
    [16 second]: 4
    [21 second]: 5
    [26 second]: 6
    [31 second]: 7
    [36 second]: 8
    [41 second]: 9


    As can be seen from the output, each element is sent through the sink sequentially.



    But if I modify the sink as follows:



    // 5 limit and parEvalMap
    val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO
    println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
    Thread.sleep(5000)
    )


    The output is:



    [1 second]: 3
    [1 second]: 2
    [1 second]: 4
    [1 second]: 1
    [6 second]: 5
    [6 second]: 6
    [6 second]: 7
    [6 second]: 8
    [11 second]: 9


    Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3 as a limit of observerAsync).



    Even if I replace observerAsync with just observe I got the same parallelization effect.



    Can you please clarify how sinks actually work?










    share|improve this question
























      0












      0








      0








      I'm experimenting with fs2.Stream concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:



      object TestParallelStream extends App 
      val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
      val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
      val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO
      println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
      Thread.sleep(5000)
      )
      val executor = Executors.newFixedThreadPool(4)
      implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))


      stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
      executor.shutdown()



      The //1 prints the following content:



      [1 second]: 1
      [6 second]: 2
      [11 second]: 3
      [16 second]: 4
      [21 second]: 5
      [26 second]: 6
      [31 second]: 7
      [36 second]: 8
      [41 second]: 9


      As can be seen from the output, each element is sent through the sink sequentially.



      But if I modify the sink as follows:



      // 5 limit and parEvalMap
      val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO
      println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
      Thread.sleep(5000)
      )


      The output is:



      [1 second]: 3
      [1 second]: 2
      [1 second]: 4
      [1 second]: 1
      [6 second]: 5
      [6 second]: 6
      [6 second]: 7
      [6 second]: 8
      [11 second]: 9


      Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3 as a limit of observerAsync).



      Even if I replace observerAsync with just observe I got the same parallelization effect.



      Can you please clarify how sinks actually work?










      share|improve this question














      I'm experimenting with fs2.Stream concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:



      object TestParallelStream extends App 
      val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
      val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
      val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO
      println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
      Thread.sleep(5000)
      )
      val executor = Executors.newFixedThreadPool(4)
      implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))


      stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
      executor.shutdown()



      The //1 prints the following content:



      [1 second]: 1
      [6 second]: 2
      [11 second]: 3
      [16 second]: 4
      [21 second]: 5
      [26 second]: 6
      [31 second]: 7
      [36 second]: 8
      [41 second]: 9


      As can be seen from the output, each element is sent through the sink sequentially.



      But if I modify the sink as follows:



      // 5 limit and parEvalMap
      val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO
      println(s"[$TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart second]: $i")
      Thread.sleep(5000)
      )


      The output is:



      [1 second]: 3
      [1 second]: 2
      [1 second]: 4
      [1 second]: 1
      [6 second]: 5
      [6 second]: 6
      [6 second]: 7
      [6 second]: 8
      [11 second]: 9


      Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3 as a limit of observerAsync).



      Even if I replace observerAsync with just observe I got the same parallelization effect.



      Can you please clarify how sinks actually work?







      scala functional-programming scala-cats fs2






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 14 '18 at 9:52









      Some NameSome Name

      1,435417




      1,435417






















          1 Answer
          1






          active

          oldest

          votes


















          1














          observe is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.



          You'd use it like this:



          stream.observeAsync(n)(sink1).to(sink2)





          share|improve this answer






















            Your Answer






            StackExchange.ifUsing("editor", function ()
            StackExchange.using("externalEditor", function ()
            StackExchange.using("snippets", function ()
            StackExchange.snippets.init();
            );
            );
            , "code-snippets");

            StackExchange.ready(function()
            var channelOptions =
            tags: "".split(" "),
            id: "1"
            ;
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function()
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled)
            StackExchange.using("snippets", function()
            createEditor();
            );

            else
            createEditor();

            );

            function createEditor()
            StackExchange.prepareEditor(
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader:
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            ,
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            );



            );













            draft saved

            draft discarded


















            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53297312%2ffs2-stream-observeasync-does-not-execute-a-given-sink-asynchronously%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1














            observe is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.



            You'd use it like this:



            stream.observeAsync(n)(sink1).to(sink2)





            share|improve this answer



























              1














              observe is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.



              You'd use it like this:



              stream.observeAsync(n)(sink1).to(sink2)





              share|improve this answer

























                1












                1








                1







                observe is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.



                You'd use it like this:



                stream.observeAsync(n)(sink1).to(sink2)





                share|improve this answer













                observe is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.



                You'd use it like this:



                stream.observeAsync(n)(sink1).to(sink2)






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 19 '18 at 16:16









                DaenythDaenyth

                24.2k964104




                24.2k964104





























                    draft saved

                    draft discarded
















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid


                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.

                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function ()
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53297312%2ffs2-stream-observeasync-does-not-execute-a-given-sink-asynchronously%23new-answer', 'question_page');

                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Use pre created SQLite database for Android project in kotlin

                    Darth Vader #20

                    Ondo