Error when query can return more documents than the batchSize

545 views
Skip to first unread message

kate.hyp...@gmail.com

unread,
Feb 2, 2021, 6:15:33 PM2/2/21
to ReactiveMongo - http://reactivemongo.org
Hello all,

I am having an issue whenever I have a query that can return more documents than the batchSize (Regardless of what I set the batch size to, or use the default).

It happens on every single query I've tried to make with a variety of selectors and projections. Using BSONCollections and `collect[List]` at the end (not streams). The error also persists regardless of setting a limit (if the limit is over the batch size or -1). Forcing a single batch does work, and I'm happy to do that, but my understanding is that is not a good practice and if the batch goes over 16mb it will fail.

This is the error I receive:

DatabaseException['Cursor session id (401cedd8-d754-3de3-be5d-68501e0544ce - 4wrEYPFdyaRlj/wDR3Gvey6KtXWmGsZMz0DPB8jBWA0=) is not the same as the operation context's session id (none)' (code = 13)]

I'm running Mongo 4.4.1 and have the issue on ReativeMongo versions 1.0.0 and 0.20.8. The issue persists with and without using transactions.

I feel like I'm missing something super obvious; is anyone able to point me in the right direction?

Thanks!

Cédric Chantepie

unread,
Feb 2, 2021, 7:42:15 PM2/2/21
to ReactiveMongo - http://reactivemongo.org
Without code/reproducer, it's hardly possible to give any answer.

kate.hyp...@gmail.com

unread,
Feb 2, 2021, 8:56:25 PM2/2/21
to ReactiveMongo - http://reactivemongo.org
Sorry, I should have included an example!

It does happen on every single `find` query I make, regardless of the collection, database, or mongo server my application is connected to (I've tried on several). Here's an example:

reactiveMongoApi.database.map(_.collection[BSONCollection]("accounts")).flatMap(collection => {

// This returns more than the default batchSize (which seems to be 100? If I limit it to 100 without specifying a batch size it works), and throws the mentioned error
collection.find(
selector = BSONDocument(
"created" -> BSONDocument("$lte" -> someDateTime.format(DateTimeFormatter.ISO_DATE_TIME))
),
projection = Option.empty[Account]
)
.cursor[Account]()
.collect[List](Int.MaxValue, Cursor.FailOnError[List[Account]]())

// This works, if I make the limit any larger it fails with the mentioned error
collection.find(
selector = BSONDocument(
"created" -> BSONDocument("$lte" -> someDateTime.format(DateTimeFormatter.ISO_DATE_TIME))
),
projection = Option.empty[Account]
)
.cursor[Account]()
.collect[List](100, Cursor.FailOnError[List[Account]]())

// This works. As does any `batchSize(n)` where N is greater than the number of documents the selector returns
collection.find(
selector = BSONDocument(
"created" -> BSONDocument("$lte" -> someDateTime.format(DateTimeFormatter.ISO_DATE_TIME))
),
projection = Option.empty[Account]
).singleBatch()
.cursor[Account]()
.collect[List](Int.MaxValue, Cursor.FailOnError[List[Account]]())

// This fails with the mentioned error
collection.find(
selector = BSONDocument(
"created" -> BSONDocument("$lte" -> someDateTime.format(DateTimeFormatter.ISO_DATE_TIME))
),
projection = Option.empty[Account]
).batchSize(50)
.cursor[Account]()
.collect[List](60, Cursor.FailOnError[List[Account]]())

})

Cédric Chantepie

unread,
Feb 3, 2021, 8:15:26 AM2/3/21
to ReactiveMongo - http://reactivemongo.org
I cannot reproduce any error with MongoDB 4.4 (or MongoDB 4.0) with ReactiveMongo 1.0.1 .

Populate test collection using MongoShell:

> for (i = 0; i < 500; i++) { db.tmp.insert({ _id: i }) }
WriteResult({ "nInserted" : 1 })
> db.tmp.find({})
{ "_id" : 0 }
{ "_id" : 1 }
{ "_id" : 2 }
{ "_id" : 3 }
{ "_id" : 4 }
{ "_id" : 5 }
{ "_id" : 6 }
{ "_id" : 7 }
{ "_id" : 8 }
{ "_id" : 9 }
{ "_id" : 10 }
{ "_id" : 11 }
{ "_id" : 12 }
{ "_id" : 13 }
{ "_id" : 14 }
{ "_id" : 15 }
{ "_id" : 16 }
{ "_id" : 17 }
{ "_id" : 18 }
{ "_id" : 19 }
Type "it" for more
> db.tmp.count({})
500

In SBT console:

val res1 = Await.result(
  db.collection[BSONCollection]("tmp")
    .find(BSONDocument.empty)
    .cursor[BSONDocument]()
    .collect[List](Int.MaxValue, Cursor.FailOnError[List[BSONDocument]]()),
  5.seconds)

res1.size

/* Result OK:

res1: List[reactivemongo.api.bson.BSONDocument] = List(BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<...

scala>

scala> res1.size
res7: Int = 500
 */


val res2 = Await.result(
  db.collection[BSONCollection]("tmp")
    .find(BSONDocument.empty)
    .batchSize(50)
    .cursor[BSONDocument]()
    .collect[List](Int.MaxValue, Cursor.FailOnError[List[BSONDocument]]()),
  5.seconds)

res2.size

/* Result:

res2: List[reactivemongo.api.bson.BSONDocument] = List(BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<non-empty>), BSONDocument(<...

scala> res2.size
res9: Int = 500

 */

kate.hyp...@gmail.com

unread,
Feb 3, 2021, 11:34:39 AM2/3/21
to ReactiveMongo - http://reactivemongo.org
Thanks so much for your example, that helped me narrow down my issue much better!

The code as you provide definitely worked for me,  so I went in and tried to figure out where my code was going wrong. When testing to see if the transaction was causing problems, I was removing starting the transaction, but not starting the session, and I think it's the session causing the error.

The code below (run through SBT console) throws the same error (With the same collection populated with your example):

db.startSession().map(dbWithSession => {
  val response = Await.result(
   dbWithSession.collection[BSONCollection]("tmp")
    .find(
    selector = BSONDocument.empty,
    )
    .cursor[BSONDocument]()
    .collect[List](Int.MaxValue, Cursor.FailOnError[List[BSONDocument]]()),
  5.seconds)

  response.size
})

Results in:

scala.concurrent.Future[Int] = Future(Failure(DatabaseException['Cursor session id (ec783b2f-a92b-3dea-9916-4b3069470108 - 4wrEYPFdyaRlj/wDR3Gvey6KtXWmGsZMz0DPB8jBWA0=) is not the same as the operation context's session id (none)' (code = 13)]))

Cédric Chantepie

unread,
Feb 4, 2021, 5:35:36 PM2/4/21
to ReactiveMongo - http://reactivemongo.org
Try with 1.0.3-SNAPSHOT

kate.hyp...@gmail.com

unread,
Feb 4, 2021, 6:01:18 PM2/4/21
to ReactiveMongo - http://reactivemongo.org
Perfect! That fixed the issue!

Thanks again!

kate.hyp...@gmail.com

unread,
Feb 8, 2021, 1:08:54 PM2/8/21
to ReactiveMongo - http://reactivemongo.org
I apologize, I should have tested further before replying, but I'm getting new errors with the snapshot version when I actually try to start a transaction:

This code (Same as before but starting a transaction too):

db.startSession().map(dbSession => {
      dbSession.startTransaction(None).map(dbWithTransaction => {
        val response = Await.result(
          dbWithTransaction.collection[BSONCollection]("tmp")
            .find(
              selector = BSONDocument.empty,
            )
            .cursor[BSONDocument]()
            .collect[List](Int.MaxValue, Cursor.FailOnError[List[BSONDocument]]()),
          5.seconds)

        response.size
      })
    })

Results in this error: scala.concurrent.Future[scala.concurrent.Future[Int]] = Future(Success(Future(Failure(DatabaseException['tmongo ' (code = 251)]))))

kate.hyp...@gmail.com

unread,
Feb 8, 2021, 1:10:46 PM2/8/21
to ReactiveMongo - http://reactivemongo.org
I didn't copy/paste that error correctly: DatabaseException['Given transaction number 1 does not match any in-progress transactions. The active transaction number is -1' (code = 251)]

Cédric Chantepie

unread,
Feb 8, 2021, 3:23:28 PM2/8/21
to ReactiveMongo - http://reactivemongo.org
Cannot reproduce such issue with a local RS and 1.0.3.

val db = rm.lastDatabase

def res = for {
  dbSession <- db.startSession()
  dbTx <- dbSession.startTransaction(None)
  col = dbTx.collection[BSONCollection]("tmp")
  docs <- col.find(BSONDocument.empty).
  cursor[BSONDocument]().
  collect[List]()
} yield docs

val docs = Await.result(res, 5.seconds)

docs.size

res11: Int = 500

kate.hyp...@gmail.com

unread,
Feb 10, 2021, 10:01:49 AM2/10/21
to ReactiveMongo - http://reactivemongo.org
Ah, for whatever reason, once I moved off the snapshot to the actual new version it did start working for me. I'm sure it was just some bad set up I had.

Thanks again!
Reply all
Reply to author
Forward
0 new messages