scala driver 1.1.1 flatMap bug

25 zobrazení
Preskočiť na prvú neprečítanú správu

Luciano Joublanc

neprečítané,
1. 7. 2016, 7:45:241. 7. 2016
komu: mongodb-user
Hi, 

Trying to get to grips with the new scala driver, and I keep on getting all sorts of errors when composting org.mogodb.scala.Observable. For example:


def just[T](t: T*): Observable[T] = {
 
new Observable[T] {
 
override def subscribe(observer: Observer[_ >: T]): Unit = {
 
for (tee <- t) { observer onNext tee }
 observer onComplete
()
 
}
 
}
}


Then in the console (import Async,and duration._)

scala
> Await result ( just(1,2,3,4) toFuture, 3 seconds)

warning
: there were two feature warnings; re-run with -feature for details
res4
: Seq[Int] = List(1, 2, 3, 4)

The above works, however:

scala> Await result ( just(1,2,3,4) flatMap { i => just(i,i) } toFuture, 3 seconds)

warning
: there were two feature warnings; re-run with -feature for details
java
.util.NoSuchElementException: None.get
  at scala
.None$.get(Option.scala:344)
  at scala
.None$.get(Option.scala:342)
  at org
.mongodb.scala.internal.FlatMapObservable$$anon$1$$anon$3.onComplete(FlatMapObservable.scala:97)
  at com
.dinogroup.scarctic.util.package$$anon$1.subscribe(util.scala:14)



Any ideas? This seems like a really simple thing I'm doing - is this a bug or am I just not using this correctly.

Ross Lawley

neprečítané,
1. 7. 2016, 9:57:551. 7. 2016
komu: mongodb-user
Hi Luciano,

Thats a great question and it looks like you're really close to a solution. The issue here is Observers must be subscribed to and its the subscription that handles the data passing to the Observer, not the Observable. The error is a result of this contract not being fulfilled. I've added SCALA-248 to ensure that we explicitly check the state and provide a helpful error message when flatMapping.

Here is an example of the just method honouring the contract by just handling a subscription to the Observerable and then using the subscription to interact with the Observer:

def just[T](t: T*): Observable[T] = {
 
new Observable[T] {
   
override def subscribe(observer: Observer[_ >: T]): Unit = {

      observer
.onSubscribe(new Subscription {
       
@volatile
        private var remaining: Iterable[T] = t
       
@volatile
        private var subscribed: Boolean = true

        override def isUnsubscribed: Boolean = !subscribed

        override def request(n: Long): Unit = {
         
if (n < 1) {
           
throw new IllegalArgumentException(s"Number requested cannot be negative: $n")
         
}
         
var counter = n
         
while (subscribed && counter > 0 && remaining.nonEmpty) {
           
val head = remaining.head
           
remaining = remaining.tail
            observer
.onNext(head)
            counter
-= 1
          }

         
if (subscribed && remaining.isEmpty) {
            observer
.onComplete()
         
}
       
}

       
override def unsubscribe(): Unit = subscribed = false
      })
   
}
 
}
}

These interfaces are similar to Publisher, Subscription and Subscriber interfaces from the reactive streams JVM implementation. However, we prefer the name Observerable to Publisher and Observer to Subscriber just for readability purposes.

I hope that helps,

Ross

Odpovedať všetkým
Odpovedať autorovi
Poslať ďalej
0 nových správ