RxJava2 connectable observable - replay not emitting all previous items











up vote
0
down vote

favorite












 List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);


Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});


observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);

}
});



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});


when i run the above code i get the following output:



consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3


i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:



//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3

//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3

//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3

//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3


this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.










share|improve this question
























  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
    – akarnokd
    2 days ago










  • its not the same consumer. i have used "new Consumer(...) "in each subscription.
    – j2emanue
    2 days ago












  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
    – akarnokd
    2 days ago










  • can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
    – j2emanue
    2 days ago










  • It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
    – akarnokd
    2 days ago















up vote
0
down vote

favorite












 List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);


Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});


observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);

}
});



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});


when i run the above code i get the following output:



consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3


i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:



//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3

//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3

//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3

//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3


this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.










share|improve this question
























  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
    – akarnokd
    2 days ago










  • its not the same consumer. i have used "new Consumer(...) "in each subscription.
    – j2emanue
    2 days ago












  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
    – akarnokd
    2 days ago










  • can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
    – j2emanue
    2 days ago










  • It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
    – akarnokd
    2 days ago













up vote
0
down vote

favorite









up vote
0
down vote

favorite











 List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);


Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});


observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);

}
});



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});


when i run the above code i get the following output:



consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3


i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:



//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3

//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3

//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3

//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3


this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.










share|improve this question















 List<Integer> list = new ArrayList<Integer>();
for(int j=1;j<=3;j++)
list.add(j);


Observable<Integer> observable = Observable.fromIterable(list)
.replay()
.autoConnect();



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});


observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", ""+integer);

}
});



observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});


when i run the above code i get the following output:



consumer1:: 1
consumer1:: 2
consumer1:: 3
consumer2:: 1
consumer2:: 2
consumer2:: 3
consumer3:: 1
consumer3:: 2
consumer3:: 3


i was expecting replay to actually "replay" all the history that occurred before. so i was expecting to emit all the streams that occurred before. specially this is the output i expected:



//first time nothing to replay so just do the work
consumer1:: 1
consumer1:: 2
consumer1:: 3

//replay consumer1 stream:,
consumer2:: 1
consumer2:: 2
consumer2:: 3
//already replayed now do the work
consumer2:: 1
consumer2:: 2
consumer2:: 3

//replay consumer1 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3

//replay consumer2 stream:
consumer3:: 1
consumer3:: 2
consumer3:: 3
//now do the work
consumer3:: 1
consumer3:: 2
consumer3:: 3


this is what i am expecting to happen with replay. what am i doing wrong ? its like its not even working now the way i have it.







rx-java2






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited 2 days ago









akarnokd

48.1k996145




48.1k996145










asked 2 days ago









j2emanue

20.7k21135243




20.7k21135243












  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
    – akarnokd
    2 days ago










  • its not the same consumer. i have used "new Consumer(...) "in each subscription.
    – j2emanue
    2 days ago












  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
    – akarnokd
    2 days ago










  • can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
    – j2emanue
    2 days ago










  • It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
    – akarnokd
    2 days ago


















  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
    – akarnokd
    2 days ago










  • its not the same consumer. i have used "new Consumer(...) "in each subscription.
    – j2emanue
    2 days ago












  • Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
    – akarnokd
    2 days ago










  • can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
    – j2emanue
    2 days ago










  • It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
    – akarnokd
    2 days ago
















Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago




Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were?
– akarnokd
2 days ago












its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago






its not the same consumer. i have used "new Consumer(...) "in each subscription.
– j2emanue
2 days ago














Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago




Each subscription is an individual connection into the cache. Why would it replay the cached items multiple times to the same consumer based on how many previous consumers there were? –akarnokd. its not the same consumer. i have used "new Consumer(...) "in each subscription. – j2emanue
– akarnokd
2 days ago












can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago




can you show me how to get what i want then ? what is the purpose of replay subject if it not replaying the previous emissions then ? how can this be achieved ?
– j2emanue
2 days ago












It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago




It replays the items received by it, not the items it replayed to consumers. That doesn't make much sense and would lead to longer and longer sequences as more and more consumers will repeatedly get the same few items over and over. The 100th consumer would get 300 items with your expectation!
– akarnokd
2 days ago












1 Answer
1






active

oldest

votes

















up vote
1
down vote



accepted










Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:



List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}


AtomicInteger count = new AtomicInteger();

Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});





share|improve this answer





















  • What is the difference between replay method and cache method in your answer?
    – j2emanue
    2 days ago










  • stackoverflow.com/questions/32522017/…
    – akarnokd
    2 days ago










  • i see. replay().autoConnect() is the equilent for cache() ..thanks
    – j2emanue
    2 days ago











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














 

draft saved


draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53350251%2frxjava2-connectable-observable-replay-not-emitting-all-previous-items%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
1
down vote



accepted










Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:



List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}


AtomicInteger count = new AtomicInteger();

Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});





share|improve this answer





















  • What is the difference between replay method and cache method in your answer?
    – j2emanue
    2 days ago










  • stackoverflow.com/questions/32522017/…
    – akarnokd
    2 days ago










  • i see. replay().autoConnect() is the equilent for cache() ..thanks
    – j2emanue
    2 days ago















up vote
1
down vote



accepted










Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:



List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}


AtomicInteger count = new AtomicInteger();

Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});





share|improve this answer





















  • What is the difference between replay method and cache method in your answer?
    – j2emanue
    2 days ago










  • stackoverflow.com/questions/32522017/…
    – akarnokd
    2 days ago










  • i see. replay().autoConnect() is the equilent for cache() ..thanks
    – j2emanue
    2 days ago













up vote
1
down vote



accepted







up vote
1
down vote



accepted






Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:



List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}


AtomicInteger count = new AtomicInteger();

Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});





share|improve this answer












Not sure why you wanted this exotic behavior, but you can repeat with an ever increasing number:



List<Integer> list = new ArrayList<Integer>();
for (int j = 1; j <= 3; j++) {
list.add(j);
}


AtomicInteger count = new AtomicInteger();

Observable<Integer> observable =
Observable.defer(() -> {
Observable.fromIterable(list)
.replay()
.autoConnect()
.repeat(count.incrementAndGet());
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer1:", ""+integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer2:", "" + integer);
}
});

observable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.v("consumer3:", ""+integer);
}
});






share|improve this answer












share|improve this answer



share|improve this answer










answered 2 days ago









akarnokd

48.1k996145




48.1k996145












  • What is the difference between replay method and cache method in your answer?
    – j2emanue
    2 days ago










  • stackoverflow.com/questions/32522017/…
    – akarnokd
    2 days ago










  • i see. replay().autoConnect() is the equilent for cache() ..thanks
    – j2emanue
    2 days ago


















  • What is the difference between replay method and cache method in your answer?
    – j2emanue
    2 days ago










  • stackoverflow.com/questions/32522017/…
    – akarnokd
    2 days ago










  • i see. replay().autoConnect() is the equilent for cache() ..thanks
    – j2emanue
    2 days ago
















What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago




What is the difference between replay method and cache method in your answer?
– j2emanue
2 days ago












stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago




stackoverflow.com/questions/32522017/…
– akarnokd
2 days ago












i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago




i see. replay().autoConnect() is the equilent for cache() ..thanks
– j2emanue
2 days ago


















 

draft saved


draft discarded



















































 


draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53350251%2frxjava2-connectable-observable-replay-not-emitting-all-previous-items%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Costa Masnaga

Fotorealismo

Sidney Franklin