The sink I'm using doesn't wait for the stream to end.
I upgraded to 0.12.RC2 and managed to run with debug. I was able to figure out which line was causing the stream to terminate.
File "DocumentStage.scala"
private val futureCB =
getAsyncCallback((response: Try[Option[Response]]) => {
response match {
case Failure(reason) => onFailure(reason)
case Success(resp @ Some(r)) => {
last = None
if (r.reply.numberReturned == 0) {
completeStage()
} else {
val bulkIter = cursor.documentIterator(r).
take(maxDocs - r.reply.startingFrom)
nextD(r, bulkIter)
}
}
case Success(_) => {
kill()
last = None
completeStage()
}
}
}).invoke _
It looks like it's failing to wait for more data, after running out, getting a "0 documents" response and terminating the stream. The cursor was created with QueryOpts().tailable.awaitData
If it helps, the summary for r at that point = Response(MessageHeader(36,164214512,3019,1),Reply(8,6385064307180,33361,0),LittleEndianHeapChannelBuffer(ridx=36, widx=36, cap=36),ResponseInfo(248322366))
Thanks