RxScala Implicits missing backpressure

29 views
Skip to first unread message

ljou...@dinogroup.eu

unread,
Jun 23, 2017, 10:27:01 PM6/23/17
to mongodb-user
Hi,

There is an adapter from a mongodb scala observable to an rx producer in the github examples repository, but it doesn't appear to support backpressure, as shown by this simple test (you need to import the observableToRxObservable implicit for this to work):

import org.mongodb.scala.Observable
import org.scalatest.{FlatSpec, Matchers}
import rx.lang.{scala => rx}

  "Implicit conversion to RX" should "respect backpressure" in {
    val count = 1000
    val o = (Observable (1 to count)) zip (rx.Observable interval (100 milliseconds) take count)
    o.countLong.toBlocking.last shouldEqual count
  }

This throws a MissingBackpressureException error.

Has anybody else noticed this? Any way to work around this?

This message is intended only for the personal and confidential use of the designated recipient(s) named above.  If you are not the intended recipient of this message you are hereby notified that any review, dissemination, distribution or copying of this message is strictly prohibited.  This communication is for information purposes only and should not be regarded as an offer to sell or as a solicitation of an offer to buy any financial product, an official confirmation of any transaction, or as an official statement of the Dinosaur Group.  Email transmission cannot be guaranteed to be secure or error-free.  Therefore, we do not represent that this information is complete or accurate and it should not be relied upon as such.  All information is subject to change without notice.

ljou...@dinogroup.eu

unread,
Jun 26, 2017, 4:13:51 AM6/26/17
to mongodb-user
I should add that this is with rxscala 0.26.4, and the mongodb scala driver 2.1.0.

I've tried replacing the mongodb observable with an rx observable, i.e.:

Observable (1 to count)
with
rx.Observable from (1 to count)

and that works fine.

It appears that the rx zip function is calling request() twice: once on setProducer(), and once on onStart(). It then throws the error as soon as the n values of the first call have been emitted, and it starts servicing the second request. So e.g. if setProducer calls request(128), and onStart also calls request(128), then it throws on the 129th emission.

Ross Lawley

unread,
Jul 4, 2017, 4:30:47 AM7/4/17
to mongodb-user
The issue here is Interval doesn't (and can't) support backpressure so you need to use onBackpressureDrop or similar operators to handle the overflow of values.

Ross

On Saturday, June 24, 2017 at 3:27:01 AM UTC+1, ljou...@dinogroup.eu wrote:
Reply all
Reply to author
Forward
0 new messages