RxJava2 connectable observable - replay not emitting all previous items
up vote
0
down vote
favorite
List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);
Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
when i run the above code i get the following output:
consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3
i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:
//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3
//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3
//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3
this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.
rx-java2
add a comment |
up vote
0
down vote
favorite
List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);
Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
when i run the above code i get the following output:
consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3
i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:
//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3
//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3
//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3
this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.
rx-java2
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago
its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago
can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago
It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);
Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
when i run the above code i get the following output:
consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3
i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:
//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3
//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3
//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3
this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.
rx-java2
List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);
Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
when i run the above code i get the following output:
consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3
i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:
//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3
//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3
//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3
this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.
rx-java2
rx-java2
edited 2 days ago
akarnokd
48.1k996145
48.1k996145
asked 2 days ago
j2emanue
20.7k21135243
20.7k21135243
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago
its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago
can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago
It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago
add a comment |
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago
its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago
can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago
It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago
its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago
its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago
can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago
can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago
It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago
It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago
add a comment |
1 Answer
1
active
oldest
votes
up vote
1
down vote
accepted
Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:
List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}
AtomicInteger count = new AtomicInteger();
Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
1
down vote
accepted
Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:
List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}
AtomicInteger count = new AtomicInteger();
Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
add a comment |
up vote
1
down vote
accepted
Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:
List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}
AtomicInteger count = new AtomicInteger();
Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
add a comment |
up vote
1
down vote
accepted
up vote
1
down vote
accepted
Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:
List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}
AtomicInteger count = new AtomicInteger();
Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:
List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}
AtomicInteger count = new AtomicInteger();
Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});
observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});
answered 2 days ago
akarnokd
48.1k996145
48.1k996145
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
add a comment |
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago
add a comment |
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53350251%2frxjava2-connectable-observable-replay-not-emitting-all-previous-items%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago
its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago
Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago
can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago
It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago