reactivemongo-akkastream 0.12-RC1 Premature Termination of Tailable Cursor Streams

78 views
Skip to first unread message

Stephen Couchman

unread,
Aug 6, 2016, 5:29:32 PM8/6/16
to ReactiveMongo - http://reactivemongo.org
Hi

I've been trying to run a tailable cursor with Akka streams as so:

 import reactivemongo.akkastream._
 cursorFlattener
.flatten(
     collectionFuture
.map { c
         c
.find(BSONDocument.empty)
             
.options(QueryOpts().tailable.awaitData)
             
.cursor[MyDocumentType]()
 
}).documentSource()


While it successfully streams through all the entries already in the database - it seems it stops processing at that point instead of switching to tail mode, and I immediately find onUpstreamFinish called in later stream graphs.

Thought I'd post here first in case this was either a known problem or I was using it wrong.

Thanks for any suggestions,

~ Steve

Cédric Chantepie

unread,
Aug 6, 2016, 6:35:09 PM8/6/16
to ReactiveMongo - http://reactivemongo.org
That's not currently supported.

Cédric Chantepie

unread,
Aug 13, 2016, 8:33:30 AM8/13/16
to ReactiveMongo - http://reactivemongo.org
Have you tried with QueryOpts.tailable.awaitData ?

Stephen Couchman

unread,
Aug 13, 2016, 3:08:44 PM8/13/16
to ReactiveMongo - http://reactivemongo.org
I think so, the code I posted at the start of the thread does include it, unless I was using it wrong?

 import reactivemongo.akkastream._
 cursorFlattener
.flatten(
     collectionFuture
.map { c
         c
.find(BSONDocument.empty)
             
.options(QueryOpts().tailable.awaitData)
             
.cursor[MyDocumentType]()
 
}).documentSource()



Cédric Chantepie

unread,
Aug 13, 2016, 7:00:27 PM8/13/16
to ReactiveMongo - http://reactivemongo.org
First you need to fix the following, by replacing .map by .flatMap, and remove the cursor flattening.

collectionFuture.map { c c.find(BSONDocument.empty) ...



Stephen Couchman

unread,
Aug 13, 2016, 10:24:00 PM8/13/16
to ReactiveMongo - http://reactivemongo.org
I'm sorry, I'm confused.  Why would I want flatMap?  I'm not working with a second future.

I've tried again with code resembling (this is for a Play chunked/streaming response):

import reactivemongo.akkastream._


collectionFuture
.map { c
  val source
= c.find(BSONDocument.empty)
   
.options(QueryOpts().tailable.awaitData)
   
.cursor[MyDocumentType]()
   
.documentSource()
   
.doSomeStreamOperations()
   
.watchTermination() { (notUsed, futureState)
      logger
.info("Started watching for stream completion.")
      futureState
.onComplete {
       
case Success(Done)
          logger
.info("Stream completed successfully")
       
case Failure(e)
          logger
.error("Failed to complete stream", e)
     
}
      notUsed
   
}
 
 
Ok.chunked(source)
}



It still terminates prematurely (but with a "success" reported by the stream monitor in the code above) unfortunately, as soon as its delivered all of the data already in the database.  I have had a previous version of my code running Iteratees successfully tail-ing using this setup (it was even using the Play adapter from Iteratees to Streams.  The stream logic has not changed, I just removed all the Iteratees glue), but I was hoping to get a pure-streams version working.

Cédric Chantepie

unread,
Aug 14, 2016, 5:42:05 PM8/14/16
to ReactiveMongo - http://reactivemongo.org
Some tests have been adding to the Akka Stream modules. It hasn't issue with tailable cursor.
Make sure you don't use a Sink that is waiting the stream end (try with Sink.foreach to debug/test).

Stephen Couchman

unread,
Aug 17, 2016, 3:23:05 AM8/17/16
to ReactiveMongo - http://reactivemongo.org
The sink I'm using doesn't wait for the stream to end.

I upgraded to 0.12.RC2 and managed to run with debug.  I was able to figure out which line was causing the stream to terminate.


File "DocumentStage.scala"
     private val futureCB =
        getAsyncCallback
((response: Try[Option[Response]]) => {
          response match
{
           
case Failure(reason) => onFailure(reason)


           
case Success(resp @ Some(r)) => {
             
last = None


              if (r.reply.numberReturned == 0) {
               
completeStage()

             
} else {
                val bulkIter
= cursor.documentIterator(r).
                  take
(maxDocs - r.reply.startingFrom)


                nextD
(r, bulkIter)
             
}
           
}


           
case Success(_) => {
              kill
()


             
last = None
              completeStage
()
           
}
         
}
       
}).invoke _


It looks like it's failing to wait for more data, after running out, getting a "0 documents" response and terminating the stream.  The cursor was created with QueryOpts().tailable.awaitData


If it helps, the summary for r at that point = Response(MessageHeader(36,164214512,3019,1),Reply(8,6385064307180,33361,0),LittleEndianHeapChannelBuffer(ridx=36, widx=36, cap=36),ResponseInfo(248322366))


Thanks

Cédric Chantepie

unread,
Aug 21, 2016, 2:03:25 PM8/21/16
to ReactiveMongo - http://reactivemongo.org
Hi, you can try the last 0.12.0-SNAPSHOT

Stephen Couchman

unread,
Aug 21, 2016, 6:25:41 PM8/21/16
to ReactiveMongo - http://reactivemongo.org
Ah, yeah that seems to work great now.  Thanks!
Reply all
Reply to author
Forward
0 new messages