val doc: Document = Document("data0" -> 0, "data1" -> "506", "data2" -> "err" )
collection.find(doc).subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(result.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
Does anyone knows how to fix this?
Hi Shahab,
First make sure that you have the document you are searching exist in MongoDB - note that you are searching for data1 with a string value of “506”, not integer of 506. You could also add onComplete() to print something regardless of find() matches document(s) or not.
Also it is likely that you code finishes before the Observable is completed, as it is asynchronous.
The snippet below modifies your code example, and waiting for the data to return.
val doc: Document = Document("data0" -> 0, "data1" -> "506", "data2" -> "err" )
val observable = collection.find(doc)
observable.subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(result.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})
// wait for 3 seconds.
Await.ready(observable.toFuture, 3 seconds)
The snippet above is written in Scala v2.11.7 using mongo-scala-driver v1.1
See also MongoDB Scala Driver: Getting Started for more info and examples.
Regards,
Wan.