Newbie : how to query mongoDB asynchronously in "mongo-scala-driver" % "1.1.1".

423 views
Skip to first unread message

shahab mokarizadeh

unread,
Jun 20, 2016, 2:39:13 AM6/20/16
to mongodb-user
Hi,

I am quite new to querying MongoDB in Scala . I am using : "org.mongodb.scala" %% "mongo-scala-driver" % "1.1.1".
I have the following piece of code which runs but "onNext" is never executed (it does not print anything).
Does anyone knows how to fix this?

best ,
Shahab

val doc: Document = Document("data0" -> 0, "data1" ->  "506", "data2" -> "err" )


collection.find(doc).subscribe(new Observer[Document](){

var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None

override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}

override def onNext(result: Document): Unit = {
println(result.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}

Wan Bachtiar

unread,
Jul 5, 2016, 9:30:09 PM7/5/16
to mongodb-user

Does anyone knows how to fix this?

Hi Shahab,

First make sure that you have the document you are searching exist in MongoDB - note that you are searching for data1 with a string value of “506”, not integer of 506. You could also add onComplete() to print something regardless of find() matches document(s) or not.

Also it is likely that you code finishes before the Observable is completed, as it is asynchronous.
The snippet below modifies your code example, and waiting for the data to return.

val doc: Document = Document("data0" -> 0, "data1" ->  "506", "data2" -> "err" )

val observable = collection.find(doc)
observable.subscribe(new Observer[Document](){
  
var batchSize: Long = 10
  var seen: Long = 0
  var subscription: Option[Subscription] = None

  override def onSubscribe(subscription: Subscription): Unit = {
    this.subscription = Some(subscription)
    subscription.request(batchSize)
  }

  override def onNext(result: Document): Unit = {
    println(result.toJson())
    seen += 1
    if (seen == batchSize) {
      seen = 0

      subscription.get.request(batchSize)
    }   
  }
  override def onError(e: Throwable): Unit = println(s"Error: $e")
  override def onComplete(): Unit = println("Completed")
})
// wait for 3 seconds.
Await.ready(observable.toFuture, 3 seconds)

The snippet above is written in Scala v2.11.7 using mongo-scala-driver v1.1

See also MongoDB Scala Driver: Getting Started for more info and examples.

Regards,

Wan.

Reply all
Reply to author
Forward
0 new messages