android - RXJava - buffer observable 1 until observable 2 emits one item -
i want following behaviour:
observablemain
should buffers items until observableresumed
emits value. observablemain
should emit buffered , feature values...
what in activity's
oncreate
:
publishsubject<t> subject = ...; // create subject emit items , subscribe // 1) create main observable subject final observable<t> observablemain = subject .subscribeon(schedulers.io()) .observeon(androidschedulers.mainthread()); // 2) use custom base class in can register listeners // onresume event , can query isresumed state! // call object pauseresumeprovider! // 2.1) create observable, emits value if activity resumed final observable<boolean> obsisresumed = observable .defer(() -> observable.just(pauseresumeprovider.isactivityresumed())) .skipwhile(aboolean -> aboolean != true); // 2.2) create second observable, emits value activity resumed final observable<boolean> obsonresumed = observable.create(new observable.onsubscribe<boolean>() { @override public void call(final subscriber<? super boolean> subscriber) { pauseresumeprovider.addpauseresumelistener(new ipauseresumelistener() { @override public void onresume() { pauseresumeprovider.removepauseresumelistener(this); subscriber.onnext(true); subscriber.oncompleted(); } @override public void onpause() { } }); } }); // 2.3) combine resumed observables , emit first value can final observable<boolean> observableresumed = observable .concat(obsisresumed, obsonresumed) .first(); // 3) here i'm stuck // 3 - variant 1: observable<t> observable = observablemain .buffer(observableresumed) .concatmap(values -> observable.from(values)); // 3 - variant 2: // observable<t> observable = observablemain.delay(t -> observableresumed); // 4) emit events my subject... // event lost! subject.onnext("test in oncreate");
problem
all items send subject
after activity
resumed working, items before lost (at least delay
solution). can't desired behaviour work. how correctly solve that?
have source replayed , use delaysubscription trigger real subscription.
publishsubject<integer> emitnow = publishsubject.create(); connectableobservable<t> co = source.replay(); co.subscribe(); co.connect(); co.delaysubscription(emitnow).subscribe(...); emitnow.onnext(1);
edit:
here gist operator can lift
sequence can pause , resume emissions upstream.
Comments
Post a Comment