In my project, I need to buffer data emitted by a hot observable until the first observer subscribes. Ideally, once the first observer subscribes the "buffer" is emptied, as future observers do *not* need to receive the old data (in fact, it can be assumed that only one observer will ever subscribe).
At the time, I couldn't find a way to do that using only the built-in combinators and methods, so I dropped the "buffer only until the first subscriber appears" requirement and did this way with RxScala 0.26.2 (RxJava 1.1.6):
import rx.lang.scala._
val subject = Subject[Int]()
val stream = subject.replay
stream.connect
subject.onNext(4)
stream.foreach(x => println("element: " + x)) // prints "element: 4"
Now, I'm trying to update RxScala to version 0.26.3 (RxJava 1.2.0), but the code above does not work anymore:
stream.foreach(x => println("element: " + x)) // prints "element: 4"
rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:149)
at rx.internal.operators.OperatorReplay$UnboundedReplayBuffer.replay(OperatorReplay.java:898)
(...)
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:307)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:219)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:72)
at rx.lang.scala.Subject$class.onNext(Subject.scala:36)
at rx.lang.scala.subjects.PublishSubject.onNext(PublishSubject.scala:48)
... 37 more
Although I'm not sure of what changed, I suppose this has to do with the change in the backpressure behavior of PublishSubject described in https://github.com/ReactiveX/RxJava/releases/tag/v1.1.9. Taking this opportunity to improve my code, is there a way to achieve what I originally wanted? Even if that's not possible, can you explain how can I prevent that exception?
Thank you in advance for any help!