Closing an input stream used for an AsyncStream/StreamingResponse

81 views
Skip to first unread message

nfss...@gmail.com

unread,
Nov 1, 2017, 9:14:50 AM11/1/17
to finatra-users
I'm trying to create a REST endpoint that returns a very large response, represented by an InputStream. Due to the large size of the response, I prefer not to fully buffer it the server before sending it to the client. Additionally, I don't know the size ahead of time, so I'm using chunked transfers.

I'm using the StreamingResponse and AsyncStream classes, which seem to be just the right tool for this job.

This is my implementation:

get("/foobar") { request: Request =>
val resultStream: MyInputStream = ...
val reader: Reader = com.twitter.io.Reader.fromStream(resultStream)
val asyncStream = AsyncStream.fromReader(reader, RESPONSE_CHUNK_SIZE)
StreamingResponse(Predef.identity[Buf], headers = headers)(asyncStream)
}

My problem is that I must close the input stream in order to release the resources used to generate the stream. However, the close method of the inputstream is never called by Finatra, even after the client finishes reading the response and exits.

Is there a way of receiving a notification when Finatra finishes streaming the response to the client? I can implement a custom inputstream which automatically closes itself when the last byte is read, but this seems very hacky and does not handle the cases where the response is interrupted and the inputstream is not fully read.

I'm using Finatra 2.3.0 (cannot upgrade because the dependency on Netty 4.1 makes it incompatible with Spark, which I am also using in this program).

Thank you in advance
Nuno

Christopher Coco

unread,
Nov 5, 2017, 11:28:40 AM11/5/17
to nfss...@gmail.com, finatra-users
Unfortunately, right now there's no way to do this with AsyncStream and is partially why it's usage in Finatra is still considered experimental. However, we recognize this issue and are working on functionality to make this better. You could define something like:

trait ClosableAsyncStream[T] extends Closable {

   def asyncStream: AsyncStream[T]
}

You'd then have the Closable close(...) methods and your implementation would hold on to the inputstream and release it in a close() implementation. You'd use an implementation of this trait instead of the AsyncStream directly.

Hope that helps.

Thanks,
-c


--
You received this message because you are subscribed to the Google Groups "finatra-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to finatra-users+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Steve Cosenza

unread,
Mar 17, 2018, 11:15:05 AM3/17/18
to Christopher Coco, nfss...@gmail.com, finatra-users
Hi Nuno,

You can specify a closable resource when creating your StreamingResponse. For more information see: https://github.com/twitter/finatra/commit/3e9e168cc55153aeba77fe03b28ff20a5e118899

-Steve

Nuno Santos

unread,
Apr 3, 2018, 5:44:31 AM4/3/18
to Steve Cosenza, Christopher Coco, finatra-users
Thanks, it was just what we needed.

You received this message because you are subscribed to a topic in the Google Groups "finatra-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/finatra-users/0skjNCw1tec/unsubscribe.
To unsubscribe from this group and all its topics, send an email to finatra-users+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages