Why Monix Observable produces one element more than needed












1















I'm playing with Monix streams and got the example where I build Observable from Iterator. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:



  val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20

lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}

Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync


The case is quite simple. There is Iterator that prints log every time it produces a next value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:



[INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17


As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.



Also why the initial values are produced in main thread while subsequent ones are invoked on execution-context thread pool? Shouldn't all be using scheduler that is used to run entire stream?










share|improve this question



























    1















    I'm playing with Monix streams and got the example where I build Observable from Iterator. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:



      val count = AtomicLong(0)
    def produceValue(): Long = {
    count.transformAndGet { i =>
    logger.info(s"Producing value: ${i + 1}")
    i + 1
    }
    }
    def more(): Boolean = count.get < 20

    lazy val iter = new Iterator[Long] {
    override def hasNext: Boolean = more()
    override def next(): Long = produceValue()
    }

    Observable
    .fromIterator(iter)
    .mapParallelUnordered(5) { x =>
    Task(x)
    .foreachL { x =>
    logger.info(s"Transforming $x")
    }
    .delayExecution(3.seconds)
    }
    .consumeWith(Consumer.complete)
    .runAsync


    The case is quite simple. There is Iterator that prints log every time it produces a next value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:



    [INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
    [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
    [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
    [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
    [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
    [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
    [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17


    As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.



    Also why the initial values are produced in main thread while subsequent ones are invoked on execution-context thread pool? Shouldn't all be using scheduler that is used to run entire stream?










    share|improve this question

























      1












      1








      1








      I'm playing with Monix streams and got the example where I build Observable from Iterator. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:



        val count = AtomicLong(0)
      def produceValue(): Long = {
      count.transformAndGet { i =>
      logger.info(s"Producing value: ${i + 1}")
      i + 1
      }
      }
      def more(): Boolean = count.get < 20

      lazy val iter = new Iterator[Long] {
      override def hasNext: Boolean = more()
      override def next(): Long = produceValue()
      }

      Observable
      .fromIterator(iter)
      .mapParallelUnordered(5) { x =>
      Task(x)
      .foreachL { x =>
      logger.info(s"Transforming $x")
      }
      .delayExecution(3.seconds)
      }
      .consumeWith(Consumer.complete)
      .runAsync


      The case is quite simple. There is Iterator that prints log every time it produces a next value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:



      [INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17


      As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.



      Also why the initial values are produced in main thread while subsequent ones are invoked on execution-context thread pool? Shouldn't all be using scheduler that is used to run entire stream?










      share|improve this question














      I'm playing with Monix streams and got the example where I build Observable from Iterator. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:



        val count = AtomicLong(0)
      def produceValue(): Long = {
      count.transformAndGet { i =>
      logger.info(s"Producing value: ${i + 1}")
      i + 1
      }
      }
      def more(): Boolean = count.get < 20

      lazy val iter = new Iterator[Long] {
      override def hasNext: Boolean = more()
      override def next(): Long = produceValue()
      }

      Observable
      .fromIterator(iter)
      .mapParallelUnordered(5) { x =>
      Task(x)
      .foreachL { x =>
      logger.info(s"Transforming $x")
      }
      .delayExecution(3.seconds)
      }
      .consumeWith(Consumer.complete)
      .runAsync


      The case is quite simple. There is Iterator that prints log every time it produces a next value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:



      [INFO ] c.s.f.a.t.MonixSandbox$ [main] -  Producing value: 1
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
      [INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
      [INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17


      As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.



      Also why the initial values are produced in main thread while subsequent ones are invoked on execution-context thread pool? Shouldn't all be using scheduler that is used to run entire stream?







      scala stream reactive-streams monix






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 25 '18 at 16:48









      Michal OstruszkaMichal Ostruszka

      1,11421319




      1,11421319
























          1 Answer
          1






          active

          oldest

          votes


















          4















          As you can see, initially the stream produces 6 elements




          The low-level communication protocol is designed around a Subscriber and its (inherited) method onNext with the following signature:



          def onNext(elem: A): Future[Ack]


          (source)



          If we picture creation and transformation each being a stage, the source observable (fromIterator in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.



          So what happens is:





          • fromIterator stage generates value 1

          • the 1 value is pushed to mapAsyncUnordered stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediately Continue

          • Above steps are repeated for values 2-5


          • fromIterator stage generates value 6 (this is when you see the output)

          • the 6 value is pushed to mapAsyncUnordered stage. This time, it cannot be accepted immediately, so the acknowledgement is to Continue some time later. Until that, no more values are generated by fromIterator.


          What's to note is that it's not mapAsyncUnordered stage that pulls the value out of fromIterator, but that fromIterator generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.






          Shouldn't all be using scheduler that is used to run entire stream?




          Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync, executeOn, etc., you won't be able to tell whether operation will execute on same thread or not.






          share|improve this answer
























          • Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

            – Michal Ostruszka
            Nov 26 '18 at 14:16











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53469684%2fwhy-monix-observable-produces-one-element-more-than-needed%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          4















          As you can see, initially the stream produces 6 elements




          The low-level communication protocol is designed around a Subscriber and its (inherited) method onNext with the following signature:



          def onNext(elem: A): Future[Ack]


          (source)



          If we picture creation and transformation each being a stage, the source observable (fromIterator in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.



          So what happens is:





          • fromIterator stage generates value 1

          • the 1 value is pushed to mapAsyncUnordered stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediately Continue

          • Above steps are repeated for values 2-5


          • fromIterator stage generates value 6 (this is when you see the output)

          • the 6 value is pushed to mapAsyncUnordered stage. This time, it cannot be accepted immediately, so the acknowledgement is to Continue some time later. Until that, no more values are generated by fromIterator.


          What's to note is that it's not mapAsyncUnordered stage that pulls the value out of fromIterator, but that fromIterator generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.






          Shouldn't all be using scheduler that is used to run entire stream?




          Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync, executeOn, etc., you won't be able to tell whether operation will execute on same thread or not.






          share|improve this answer
























          • Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

            – Michal Ostruszka
            Nov 26 '18 at 14:16
















          4















          As you can see, initially the stream produces 6 elements




          The low-level communication protocol is designed around a Subscriber and its (inherited) method onNext with the following signature:



          def onNext(elem: A): Future[Ack]


          (source)



          If we picture creation and transformation each being a stage, the source observable (fromIterator in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.



          So what happens is:





          • fromIterator stage generates value 1

          • the 1 value is pushed to mapAsyncUnordered stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediately Continue

          • Above steps are repeated for values 2-5


          • fromIterator stage generates value 6 (this is when you see the output)

          • the 6 value is pushed to mapAsyncUnordered stage. This time, it cannot be accepted immediately, so the acknowledgement is to Continue some time later. Until that, no more values are generated by fromIterator.


          What's to note is that it's not mapAsyncUnordered stage that pulls the value out of fromIterator, but that fromIterator generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.






          Shouldn't all be using scheduler that is used to run entire stream?




          Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync, executeOn, etc., you won't be able to tell whether operation will execute on same thread or not.






          share|improve this answer
























          • Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

            – Michal Ostruszka
            Nov 26 '18 at 14:16














          4












          4








          4








          As you can see, initially the stream produces 6 elements




          The low-level communication protocol is designed around a Subscriber and its (inherited) method onNext with the following signature:



          def onNext(elem: A): Future[Ack]


          (source)



          If we picture creation and transformation each being a stage, the source observable (fromIterator in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.



          So what happens is:





          • fromIterator stage generates value 1

          • the 1 value is pushed to mapAsyncUnordered stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediately Continue

          • Above steps are repeated for values 2-5


          • fromIterator stage generates value 6 (this is when you see the output)

          • the 6 value is pushed to mapAsyncUnordered stage. This time, it cannot be accepted immediately, so the acknowledgement is to Continue some time later. Until that, no more values are generated by fromIterator.


          What's to note is that it's not mapAsyncUnordered stage that pulls the value out of fromIterator, but that fromIterator generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.






          Shouldn't all be using scheduler that is used to run entire stream?




          Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync, executeOn, etc., you won't be able to tell whether operation will execute on same thread or not.






          share|improve this answer














          As you can see, initially the stream produces 6 elements




          The low-level communication protocol is designed around a Subscriber and its (inherited) method onNext with the following signature:



          def onNext(elem: A): Future[Ack]


          (source)



          If we picture creation and transformation each being a stage, the source observable (fromIterator in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.



          So what happens is:





          • fromIterator stage generates value 1

          • the 1 value is pushed to mapAsyncUnordered stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediately Continue

          • Above steps are repeated for values 2-5


          • fromIterator stage generates value 6 (this is when you see the output)

          • the 6 value is pushed to mapAsyncUnordered stage. This time, it cannot be accepted immediately, so the acknowledgement is to Continue some time later. Until that, no more values are generated by fromIterator.


          What's to note is that it's not mapAsyncUnordered stage that pulls the value out of fromIterator, but that fromIterator generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.






          Shouldn't all be using scheduler that is used to run entire stream?




          Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync, executeOn, etc., you won't be able to tell whether operation will execute on same thread or not.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 26 '18 at 9:00









          Oleg PyzhcovOleg Pyzhcov

          4,5801821




          4,5801821













          • Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

            – Michal Ostruszka
            Nov 26 '18 at 14:16



















          • Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

            – Michal Ostruszka
            Nov 26 '18 at 14:16

















          Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

          – Michal Ostruszka
          Nov 26 '18 at 14:16





          Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed

          – Michal Ostruszka
          Nov 26 '18 at 14:16




















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53469684%2fwhy-monix-observable-produces-one-element-more-than-needed%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Create new schema in PostgreSQL using DBeaver

          Deepest pit of an array with Javascript: test on Codility

          Costa Masnaga