Why Monix Observable produces one element more than needed
I'm playing with Monix streams and got the example where I build Observable
from Iterator
. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
The case is quite simple. There is Iterator
that prints log every time it produces a next
value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered
takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.
Also why the initial values are produced in main
thread while subsequent ones are invoked on execution-context
thread pool? Shouldn't all be using scheduler that is used to run entire stream?
scala stream reactive-streams monix
add a comment |
I'm playing with Monix streams and got the example where I build Observable
from Iterator
. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
The case is quite simple. There is Iterator
that prints log every time it produces a next
value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered
takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.
Also why the initial values are produced in main
thread while subsequent ones are invoked on execution-context
thread pool? Shouldn't all be using scheduler that is used to run entire stream?
scala stream reactive-streams monix
add a comment |
I'm playing with Monix streams and got the example where I build Observable
from Iterator
. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
The case is quite simple. There is Iterator
that prints log every time it produces a next
value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered
takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.
Also why the initial values are produced in main
thread while subsequent ones are invoked on execution-context
thread pool? Shouldn't all be using scheduler that is used to run entire stream?
scala stream reactive-streams monix
I'm playing with Monix streams and got the example where I build Observable
from Iterator
. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
The case is quite simple. There is Iterator
that prints log every time it produces a next
value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered
takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.
Also why the initial values are produced in main
thread while subsequent ones are invoked on execution-context
thread pool? Shouldn't all be using scheduler that is used to run entire stream?
scala stream reactive-streams monix
scala stream reactive-streams monix
asked Nov 25 '18 at 16:48
Michal OstruszkaMichal Ostruszka
1,11421319
1,11421319
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
As you can see, initially the stream produces 6 elements
The low-level communication protocol is designed around a Subscriber
and its (inherited) method onNext with the following signature:
def onNext(elem: A): Future[Ack]
(source)
If we picture creation and transformation each being a stage, the source observable (fromIterator
in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.
So what happens is:
fromIterator
stage generates value 1- the 1 value is pushed to
mapAsyncUnordered
stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinue
- Above steps are repeated for values 2-5
fromIterator
stage generates value 6 (this is when you see the output)- the 6 value is pushed to
mapAsyncUnordered
stage. This time, it cannot be accepted immediately, so the acknowledgement is toContinue
some time later. Until that, no more values are generated byfromIterator
.
What's to note is that it's not mapAsyncUnordered
stage that pulls the value out of fromIterator
, but that fromIterator
generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync
, executeOn
, etc., you won't be able to tell whether operation will execute on same thread or not.
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53469684%2fwhy-monix-observable-produces-one-element-more-than-needed%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
As you can see, initially the stream produces 6 elements
The low-level communication protocol is designed around a Subscriber
and its (inherited) method onNext with the following signature:
def onNext(elem: A): Future[Ack]
(source)
If we picture creation and transformation each being a stage, the source observable (fromIterator
in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.
So what happens is:
fromIterator
stage generates value 1- the 1 value is pushed to
mapAsyncUnordered
stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinue
- Above steps are repeated for values 2-5
fromIterator
stage generates value 6 (this is when you see the output)- the 6 value is pushed to
mapAsyncUnordered
stage. This time, it cannot be accepted immediately, so the acknowledgement is toContinue
some time later. Until that, no more values are generated byfromIterator
.
What's to note is that it's not mapAsyncUnordered
stage that pulls the value out of fromIterator
, but that fromIterator
generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync
, executeOn
, etc., you won't be able to tell whether operation will execute on same thread or not.
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
add a comment |
As you can see, initially the stream produces 6 elements
The low-level communication protocol is designed around a Subscriber
and its (inherited) method onNext with the following signature:
def onNext(elem: A): Future[Ack]
(source)
If we picture creation and transformation each being a stage, the source observable (fromIterator
in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.
So what happens is:
fromIterator
stage generates value 1- the 1 value is pushed to
mapAsyncUnordered
stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinue
- Above steps are repeated for values 2-5
fromIterator
stage generates value 6 (this is when you see the output)- the 6 value is pushed to
mapAsyncUnordered
stage. This time, it cannot be accepted immediately, so the acknowledgement is toContinue
some time later. Until that, no more values are generated byfromIterator
.
What's to note is that it's not mapAsyncUnordered
stage that pulls the value out of fromIterator
, but that fromIterator
generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync
, executeOn
, etc., you won't be able to tell whether operation will execute on same thread or not.
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
add a comment |
As you can see, initially the stream produces 6 elements
The low-level communication protocol is designed around a Subscriber
and its (inherited) method onNext with the following signature:
def onNext(elem: A): Future[Ack]
(source)
If we picture creation and transformation each being a stage, the source observable (fromIterator
in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.
So what happens is:
fromIterator
stage generates value 1- the 1 value is pushed to
mapAsyncUnordered
stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinue
- Above steps are repeated for values 2-5
fromIterator
stage generates value 6 (this is when you see the output)- the 6 value is pushed to
mapAsyncUnordered
stage. This time, it cannot be accepted immediately, so the acknowledgement is toContinue
some time later. Until that, no more values are generated byfromIterator
.
What's to note is that it's not mapAsyncUnordered
stage that pulls the value out of fromIterator
, but that fromIterator
generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync
, executeOn
, etc., you won't be able to tell whether operation will execute on same thread or not.
As you can see, initially the stream produces 6 elements
The low-level communication protocol is designed around a Subscriber
and its (inherited) method onNext with the following signature:
def onNext(elem: A): Future[Ack]
(source)
If we picture creation and transformation each being a stage, the source observable (fromIterator
in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.
So what happens is:
fromIterator
stage generates value 1- the 1 value is pushed to
mapAsyncUnordered
stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinue
- Above steps are repeated for values 2-5
fromIterator
stage generates value 6 (this is when you see the output)- the 6 value is pushed to
mapAsyncUnordered
stage. This time, it cannot be accepted immediately, so the acknowledgement is toContinue
some time later. Until that, no more values are generated byfromIterator
.
What's to note is that it's not mapAsyncUnordered
stage that pulls the value out of fromIterator
, but that fromIterator
generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.
Shouldn't all be using scheduler that is used to run entire stream?
Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like executeAsync
, executeOn
, etc., you won't be able to tell whether operation will execute on same thread or not.
answered Nov 26 '18 at 9:00
Oleg PyzhcovOleg Pyzhcov
4,5801821
4,5801821
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
add a comment |
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
Thanks! I thought it's more like the item is not produced until there is a demand from downstream, not that it stays ready to be pushed
– Michal Ostruszka
Nov 26 '18 at 14:16
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53469684%2fwhy-monix-observable-produces-one-element-more-than-needed%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown