Using the async GridFS api with asynchronously available data

104 views
Skip to first unread message

Steve Hummingbird

unread,
Apr 14, 2016, 9:45:18 AM4/14/16
to mongodb-user
I am reposting this here, as I am not sure if it just got buried within the already closed jira story:
Driver is from the current java master branch.

I was trying to use the new GridFS api, however I was having some trouble when working with data that is available asynchronously, but I might just be missing something obvious. I am using vertx 2, where I have to register a handler when new data is available. 

Apparently I can not do something like that, where I would just call the write() method when data is available, as this leads to an exception: com.mongodb.MongoGridFSException: The AsyncOutputStream does not support concurrent writing.

GridFSUploadStream uploadStream = gridFS.openUploadStream('test', options)
fileUpload.dataHandler { Buffer buffer ->
    uploadStream.write(ByteBuffer.wrap(buffer.bytes), { Integer bytesWritten, Throwable t ->
        System.out.println('wrote: ' + bytesWritten + ' bytes. t: ' + t)
    } as SingleResultCallback<Integer>)
}

This would lead the conclusion that I have to buffer all the data programmatically until the GridFS driver is calling the callback, where I then am able to add another slice of data, where I can add another callback to handle the next slice that has been available in the meantime, or maybe that slice isn't available yet, which will complicate things further.

In case I implement my own AsyncInputStream, the same thing applies. I would need to somehow buffer the data until the read method is called by the driver where I can provide a slice of data, which in turn seems quite cumbersome to me. And just in case, if the next slice of data is not yet available when the read method is called, and I provide 0 bytes, does the driver periodically poll if the data is available yet?

So the only option that appears to me is to buffer all data until everything is available and then call the GridFS driver, but that does not seem very efficient to me.

I guess there must be a better way to do this, which I am just missing. Could you please add some detail on how the driver should be used in such case?

Ross Lawley

unread,
Apr 18, 2016, 7:13:27 AM4/18/16
to mongodb-user

Hi Steve,


To mirror Jeff's comment in JAVA-1282 this is an example of the producer-consumer problem - someone has to control the data flow.  As the file being uploaded into GridFS could be huge, the driver doesn't handle buffering because to do so would risk the potential for an out of memory exception. This is similar to the AsynchronousSocketChannel API that also expects the producer to handle the flow and control of the data.


You are correct in that you should call uploadStream.write chronologically with each buffer and if a new buffer comes in before the uploadStream can process it, it should be buffered and then processed when the uploadStream.write callback is triggered. I know Vert.x has back pressure support via the reactive streams api, so it may be possible to convert the code that writes the data to that API and save replicating the buffering logic 


In case I implement my own AsyncInputStream, the same thing applies. I would need to somehow buffer the data until the read method is called by the driver where I can provide a slice of data, which in turn seems quite cumbersome to me. And just in case, if the next slice of data is not yet available when the read method is called, and I provide 0 bytes, does the driver periodically poll if the data is available yet?


I'm not sure I follow here, but given that when reading data from GridFS you can provide the buffer which will be used and information about the bytes transferred in the callback.  So I think I'm missing something here, could you provide an example of using that kind of api in Vert.x?

Steve Hummingbird

unread,
Apr 18, 2016, 5:37:51 PM4/18/16
to mongodb-user
Hi Ross, 

thank you. That actually cleared some things up. I ended up implementing both mongo's AsyncInputStream and vertx's WriteStream in my own class which handles the back pressure and buffers via a Queue. 
Reply all
Reply to author
Forward
0 new messages