android - RXJava - buffer observable 1 until observable 2 emits one item -


i want following behaviour:

observablemain should buffers items until observableresumed emits value. observablemain should emit buffered , feature values...

what in activity's oncreate:

 publishsubject<t> subject = ...; // create subject emit items , subscribe   // 1) create main observable subject  final observable<t> observablemain = subject             .subscribeon(schedulers.io())             .observeon(androidschedulers.mainthread());   // 2) use custom base class in can register listeners   // onresume event , can query isresumed state!   // call object pauseresumeprovider!   // 2.1) create observable, emits value if activity resumed  final observable<boolean> obsisresumed = observable             .defer(() -> observable.just(pauseresumeprovider.isactivityresumed()))             .skipwhile(aboolean -> aboolean != true);   // 2.2) create second observable, emits value activity resumed  final observable<boolean> obsonresumed = observable.create(new observable.onsubscribe<boolean>()     {         @override         public void call(final subscriber<? super boolean> subscriber)         {             pauseresumeprovider.addpauseresumelistener(new ipauseresumelistener() {                 @override                 public void onresume() {                     pauseresumeprovider.removepauseresumelistener(this);                     subscriber.onnext(true);                     subscriber.oncompleted();                 }                  @override                 public void onpause() {                  }             });         }     });  // 2.3) combine resumed observables , emit first value can final observable<boolean> observableresumed = observable         .concat(obsisresumed, obsonresumed)         .first();  // 3) here i'm stuck // 3 - variant 1: observable<t> observable = observablemain             .buffer(observableresumed)             .concatmap(values -> observable.from(values)); // 3 - variant 2: // observable<t> observable = observablemain.delay(t -> observableresumed);  // 4) emit events my subject... // event lost! subject.onnext("test in oncreate"); 

problem

all items send subject after activity resumed working, items before lost (at least delay solution). can't desired behaviour work. how correctly solve that?

have source replayed , use delaysubscription trigger real subscription.

publishsubject<integer> emitnow = publishsubject.create();  connectableobservable<t> co = source.replay();  co.subscribe();  co.connect();  co.delaysubscription(emitnow).subscribe(...);  emitnow.onnext(1); 

edit:

here gist operator can lift sequence can pause , resume emissions upstream.


Comments

Popular posts from this blog

Load Balancing in Bluemix using custom domain and DNS SRV records -

oracle - pls-00402 alias required in select list of cursor to avoid duplicate column names -

python - Consider setting $PYTHONHOME to <prefix>[:<exec_prefix>] error -