iterating over cursor resulted from aggregation

877 views
Skip to first unread message

Sunil Pasumarthi

unread,
Nov 19, 2016, 12:03:52 PM11/19/16
to ReactiveMongo - http://reactivemongo.org
Hi, 

i am using play2-reactivemongo 0.11.11, 
and i am trying to iterate over cursor returned from aggregation(results in very large data set)

i know that setting batch size of cursor works only on first batch, but i want to iterate over all successive batches. 
i found about this in mongo docs where they are suggesting to use OP_GET_MORE to iterate over next batches and I cant find a way to use that.

is there any way i could achieve this?? or can anyone suggest any alternate method?

THANKS

Cédric Chantepie

unread,
Nov 19, 2016, 6:41:53 PM11/19/16
to ReactiveMongo - http://reactivemongo.org

Sunil Pasumarthi

unread,
Nov 27, 2016, 6:43:23 AM11/27/16
to ReactiveMongo - http://reactivemongo.org
that new aggregation function returning successive batches of size 1 after initial batch 

Cédric Chantepie

unread,
Nov 27, 2016, 11:09:21 AM11/27/16
to ReactiveMongo - http://reactivemongo.org
Having a look at the API documentation, it can be seen that the batch size is configurable using QueryOpts.batchSize : http://reactivemongo.org/releases/0.12/api/index.html#reactivemongo.api.QueryOpts

Sunil Pasumarthi

unread,
Nov 28, 2016, 4:08:40 AM11/28/16
to ReactiveMongo - http://reactivemongo.org
i have updated play plugin to "org.reactivemongo" %% "play2-reactivemongo" % "0.12.0"

and i have tried the following:

val matchOp =  collection.BatchCommands.AggregationFramework.Match(Json.obj())
val pipeline = List(
      collection.BatchCommands.AggregationFramework.Group(Json.toJson("$basic_info.device_uuid"))
      ("install_date" -> collection.BatchCommands.AggregationFramework.First("session_details.launch_time"))
    )
val cursor = collection.BatchCommands.AggregationFramework.Cursor(100)

val returnedCursor = collection.aggregate[JsValue](matchOp, List(matchOp), Some(cursor), explain = false, allowDiskUse = true, bypassDocumentValidation = true, readConcern = None, readPreference = ReadPreference.Primary)

returnedCursor.foldBulks(List.empty[JsValue])(
        { (ls, iter) =>
          val list = iter.toList
          logger.debug("size " + list.size)
          Cursor.Cont(ls)
        }, { (ls, err) =>
          logger.error("error fetching documents" + err.toString)
          Cursor.Fail(err)
        }
      )
      
sample copy of log:
size 100
size 1
size 1
size 1
size 1
size 1
size 1
size 1
size 1

all i want to do is that, the next successive batches should also result in size of 100

Cédric Chantepie

unread,
Nov 29, 2016, 5:49:19 AM11/29/16
to ReactiveMongo - http://reactivemongo.org
As indicated in the MongoDB documentation, the batchSize of the cursor option for the aggregation command "specifies the size of the initial batch size only. Specify subsequent batch sizes to OP_GET_MORE operations as with other MongoDB cursors." ( https://docs.mongodb.com/manual/reference/command/aggregate/#dbcmd.aggregate ). Which means, as suggested before, that the size for the subsequent batches must be specified using the QueryOpts.batchSize.

Sunil Pasumarthi

unread,
Dec 2, 2016, 6:44:22 AM12/2/16
to ReactiveMongo - http://reactivemongo.org
I am unable to provide batch size using QueryOpts.batchSize for aggregation cursor, i have searched the documentation regarding the same and couldn't find any,

Could you elaborate on how to do it with an example

Thanks

Sunil Pasumarthi

unread,
Dec 13, 2016, 5:31:43 AM12/13/16
to ReactiveMongo - http://reactivemongo.org
  i have looked into source code about subsequent batch sizes and i have found this line """val op = Query(flags, db.name + ".$cmd", 0, 1) """ in commands.scala file. Is that left out for streaming purposes or is there any performance issues?

Cédric Chantepie

unread,
Dec 13, 2016, 2:42:13 PM12/13/16
to ReactiveMongo - http://reactivemongo.org
Command runner expects a single document. The cursor support for commands such as aggregation is implemented using getMore, so I don't see issue with this code.

The size of batch for getMore of such command is not yet configurable.

Sunil Pasumarthi

unread,
Dec 14, 2016, 7:44:38 AM12/14/16
to ReactiveMongo - http://reactivemongo.org
so, there is no way to configure subsequent batch sizes for aggregation cursor in version 0.12.0

Is there a plan to implement this in coming releases, 
If there is no plan, i can raise an issue in github

Cédric Chantepie

unread,
Dec 14, 2016, 9:32:33 AM12/14/16
to ReactiveMongo - http://reactivemongo.org
It's planned but there is no ongoing develop about that.

Yardena Meymann

unread,
Nov 16, 2017, 9:54:57 AM11/16/17
to ReactiveMongo - http://reactivemongo.org
Hi Cedric,

I am having a similar issue - need to retrieve a large result set produced by aggregation query. If I don't specify batch size, the query fails because it exceeds 16MB, if I specify a batch size (50), then first batch returns with 50 documents, but the rest is coming one by one, just as Sunil described, and the query takes "forever". I am using latest reactive-mongo, do you have any advise how to solve the problem? 

  Thanks in advance,
    Yardena 
Reply all
Reply to author
Forward
0 new messages