Buffer messages until an observer connects

752 views
Skip to first unread message

Rui Gonçalves

unread,
Oct 15, 2016, 11:34:41 AM10/15/16
to RxJava
In my project, I need to buffer data emitted by a hot observable until the first observer subscribes. Ideally, once the first observer subscribes the "buffer" is emptied, as future observers do *not* need to receive the old data (in fact, it can be assumed that only one observer will ever subscribe).

At the time, I couldn't find a way to do that using only the built-in combinators and methods, so I dropped the "buffer only until the first subscriber appears" requirement and did this way with RxScala 0.26.2 (RxJava 1.1.6):

import rx.lang.scala._

val subject = Subject[Int]()
val stream = subject.replay
stream.connect

subject.onNext(4)
stream.foreach(x => println("element: " + x)) // prints "element: 4"

Now, I'm trying to update RxScala to version 0.26.3 (RxJava 1.2.0), but the code above does not work anymore:

stream.foreach(x => println("element: " + x)) // prints "element: 4"
rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
  at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
  at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
  at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
  at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
  at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
  at rx.internal.operators.NotificationLite.accept(NotificationLite.java:149)
  at rx.internal.operators.OperatorReplay$UnboundedReplayBuffer.replay(OperatorReplay.java:898)
  (...)
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
  at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:307)
  at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:219)
  at rx.subjects.PublishSubject.onNext(PublishSubject.java:72)
  at rx.lang.scala.Subject$class.onNext(Subject.scala:36)
  at rx.lang.scala.subjects.PublishSubject.onNext(PublishSubject.scala:48)
  ... 37 more

Although I'm not sure of what changed, I suppose this has to do with the change in the backpressure behavior of PublishSubject described in https://github.com/ReactiveX/RxJava/releases/tag/v1.1.9. Taking this opportunity to improve my code, is there a way to achieve what I originally wanted? Even if that's not possible, can you explain how can I prevent that exception?

Thank you in advance for any help!

Rui Gonçalves

unread,
Oct 15, 2016, 11:37:34 AM10/15/16
to RxJava
A minor correction to the post above, in RxScala 0.26.3 the element is *not* printed as the comment says, the only thing that happens is the exception throw.

Dávid Karnok

unread,
Oct 15, 2016, 11:47:38 AM10/15/16
to Rui Gonçalves, RxJava
Have you looked at UnicastSubject? It buffers elements until exactly one Observer subscribes and replays the buffered values and any subsequent ones the subject receives.
--
Best regards,
David Karnok

Rui Gonçalves

unread,
Oct 15, 2016, 12:17:32 PM10/15/16
to Dávid Karnok, RxJava
That is exactly what I was looking for, good thing it got promoted to experimental. I'll use it directly and when I have the time I'll add a wrapper for it to RxScala and open a pull request. Thank you for the lightning fast response!

Reply all
Reply to author
Forward
0 new messages