java concurrent ordered response buffer

101 views
Skip to first unread message

Piotr Morgwai Kotarbinski

unread,
Apr 20, 2021, 12:09:59 PM4/20/21
to grpc.io
Hello
i have a stream of messages coming from a websocket or a grpc client. for each message my service produces 0 or more reply messages. by default both websocket endpoints and grpc request observers are guaranteed to be called by maximum 1 thread concurrently, so my replies are sent in the same order as requests. Now I want to dispatch request processing to other threads and process them in parallel, but still keep the order. Therefore, I need some "concurrent ordered response buffer", which will buffer replies to a given request message until processing of previous requests is finished and replies to them are sent (in order they were produced within each "request bucket").

I can develop such class myself, but it seems a common case, so I was wondering if maybe such thing already exists (to not reinvent the wheel). however I could not easily find anything on the web nor get any answer on SOdoes anyone knows about something like this?

Thanks!

Piotr Morgwai Kotarbinski

unread,
Apr 22, 2021, 12:20:59 PM4/22/21
to grpc.io
in case someone needs it also, I've written it myself due to lack of answers either here and on SO:
feedback is welcome :)

Eric Anderson

unread,
Apr 27, 2021, 7:52:08 PM4/27/21
to Piotr Morgwai Kotarbinski, grpc.io
Yeah, we don't have anything pre-existing that does something like that; it gets into the specifics of your use-case. Making something yourself was appropriate. I will say that the strategy used in OrderedConcurrentOutputBuffer with the Buckets seems really clean.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/e7107eed-fa35-4b2e-8d5a-5754e0a37740n%40googlegroups.com.

Piotr Morgwai Kotarbinski

unread,
Apr 28, 2021, 3:57:46 AM4/28/21
to grpc.io
Thanks :)

I was surprised actually, because I thought parallel request message processing was a common use-case, both for gRPC and websockets
(for example if we have a service that does some single-threaded graphic processing on received images and sends back a modified version of a given image, it would be most efficient to dispatch the processing to a thread pool with a size corresponding to available CPU/GPU cores, right? As processing them sequentially would utilize just 1 core per request stream, so in case of low number of concurrent request streams, we would be underutilizing the cores).

Cheers! 

nitish bhardwaj

unread,
Apr 29, 2021, 9:59:11 PM4/29/21
to grpc.io

Thanks for contributing  OrderedConcurrentOutputBuffer.  I totally agree, this would be really useful to utilize cores efficiently.

Piotr Morgwai Kotarbinski

unread,
Apr 30, 2021, 5:10:25 AM4/30/21
to grpc.io
please note that this class should only be used if the response messages order requirement cannot be dropped: if you control a given proto interface definition, then it's more efficient to add some unique id to request messages, include it in response messages and send them as soon as they are produced, so nothing needs to be buffered.

Cheers!

Piotr Morgwai Kotarbinski

unread,
May 5, 2021, 2:58:01 PM5/5/21
to grpc.io
to fulfill the void left by my previous message  ;-)  here is ConcurrentRequestObserver which eases up developing bi-di streaming methods that dispatch work to other threads but don't need to preserve order of responses:


it handles all the synchronization and manual flow control to maintain the desired concurrency level while also preventing an excessive response messages buffering.
This one is gRPC specific as  websockets do not have half-closing and manual flow control, so not much left to ease-up there  ;-]

Cheers!

Eric Anderson

unread,
May 5, 2021, 8:15:56 PM5/5/21
to Piotr Morgwai Kotarbinski, grpc.io
On Wed, Apr 28, 2021 at 12:57 AM Piotr Morgwai Kotarbinski <mor...@gmail.com> wrote:
I was surprised actually, because I thought parallel request message processing was a common use-case, both for gRPC and websockets

Yeah, we call those "RPCs." They can scale very wide horizontally.

(for example if we have a service that does some single-threaded graphic processing on received images and sends back a modified version of a given image, it would be most efficient to dispatch the processing to a thread pool with a size corresponding to available CPU/GPU cores, right? As processing them sequentially would utilize just 1 core per request stream, so in case of low number of concurrent request streams, we would be underutilizing the cores).

What's more rare is to have in-order processing (using a stream) that can be split to multiple threads, as in-order and multi-threaded tend to be at-odds. If you are just processing images, then you can commonly process them each individually and would use separate RPCs. For in-order+multi-threaded, you need to cut up the input data in a server-specific way (e.g., I-frame frequency for video encoding), do heavy-lifting in a thread-for-each-chunk, and then recombine in the end. (If it isn't server-specific, the client is more likely to do the cutting.) The cutting and merging easily become application-specific. Although your tool would probably help many cases even with application-specific logic, it's just not a common enough case for us to have utilities on hand. I think I've seen most of such cases show up in server→client processing, like pubsub, where the server is actually sending work to the client yet can't "just do another RPC" to the client.

That said, I'm certain there are others that will be happy to pick up and use your code. Thanks for posting it!

Piotr Morgwai Kotarbinski

unread,
May 11, 2021, 10:03:20 AM5/11/21
to grpc.io
pushed both classes (and some others) to central: https://search.maven.org/artifact/pl.morgwai.base/grpc-utils/1.0-alpha1/jar

Cheers!
Reply all
Reply to author
Forward
0 new messages