aggregator pattern, out-of-order client acknowledgement, and limits

437 views
Skip to first unread message

David Bakin

unread,
Jul 14, 2015, 11:56:08 PM7/14/15
to rabbitm...@googlegroups.com, davi...@ispot.tv
I'm trying to use RabbitMQ as a durable workflow/task queue to handle not only messages between activities but also all local storage as well in the messages.  I am trying to get "at least once" semantics by using persistent exchanges/queues/messages and activities (message transducers) that are idempotent and functional (get all input via a message and send all output via a message, no local store).  An activity will pull a message from its input queue but not ack it to the broker until it has processed it (it may take quite some time) and published a message to its downflow activity.

(Right away I'd ask if anyone has pointers to software that already does this with RabbitMQ, preferably something immediately useable with a Java codebase.)

In addition, I would like to implement a scatter/gather workflow - a message will be sent to (say) 3 separate activities in parallel (using traditional pub-sub) which each will send their results to a specific activity, which will join them after all 3 results have arrived.  I have a pretty clunky way to do this using pub sub and multiple queues, and I'm looking for something better - something that will survive machine failure while still requiring no local storage (except for RabbitMQ itself).

I'm thinking of this: a consumer will pull messages from the "join queue" but ack none of them at first ... keeping them in a non-persistent map - and then when all 3 of a given set arrive (having arrived intermixed with results from other result sets) it will merge them, send the merged message on, and then ack the three.  This will clearly survive machine failure.  But will it work in practice?

I should say this is a very low message rate system, probably on the order of 10s per second at most, in fact, more likely low 100s per minute (though bursty).

My questions (thanks for reading this far!) are:

a) Is there any problem with having a moderate number of unacked messages outstanding on a single queue, for some extended period of time (minutes), and acknowledging them out of order of receipt (so that much later messages may be acked before much earlier messages)?

b) Is this a reasonable way to implement the aggregation pattern given at-least-once semantics and persistent everything?

c) Is there a better way to do it?

d) Would, for example, a custom exchange be a good idea?  I'm thinking of something like the delayed message exchange except the release condition isn't satisfying the timeout but instead that the entire set of related messages has arrived - then all are released to downstream queues at once.  (Therefore using mnesia as the persistent store for the held messages, which is fine.)

Thanks for your help and I'd be very happy to get any pointers to work that's already been done in this area.  -- David

Michael Klishin

unread,
Jul 21, 2015, 7:06:36 AM7/21/15
to David Bakin, rabbitm...@googlegroups.com, davi...@ispot.tv
Sorry for the delayed response. 

On 15 July 2015 at 06:56:11, David Bakin (davi...@gmail.com) wrote:
> I'm trying to use RabbitMQ as a durable workflow/task queue
> to handle not only messages between activities but also all local
> storage as well in the messages. I am trying to get "at least once"
> semantics by using persistent exchanges/queues/messages
> and activities (message transducers) that are idempotent and
> functional (get all input via a message and send all output via
> a message, no local store). An activity will pull a message from
> its input queue but not ack it to the broker until it has processed
> it (it may take quite some time) and published a message to its
> downflow activity.
>
> (Right away I'd ask if anyone has pointers to software that already
> does this with RabbitMQ, preferably something immediately
> useable with a Java codebase.)

I can't think of anything that implements scatter-gather specifically,
spring-amqp *may* have similar things but I don't think it has scatter/gather.

> In addition, I would like to implement a scatter/gather workflow
> - a message will be sent to (say) 3 separate activities in parallel
> (using traditional pub-sub) which each will send their results
> to a specific activity, which will join them after all 3 results
> have arrived. I have a pretty clunky way to do this using pub sub
> and multiple queues, and I'm looking for something better - something
> that will survive machine failure while still requiring no local
> storage (except for RabbitMQ itself).
>
> I'm thinking of this: a consumer will pull messages from the "join
> queue" but ack none of them at first ... keeping them in a non-persistent
> map - and then when all 3 of a given set arrive (having arrived intermixed
> with results from other result sets) it will merge them, send
> the merged message on, and then ack the three. This will clearly
> survive machine failure. But will it work in practice?

Assuming you use manual acknowledgements, this should also survive the gatherer
failure and network failures between the app and RabbitMQ nodes.

One issue I see in such a system is: what if responses take a very long time
to arrive? Your gatherer will have to have a sensible timeout and ability
to discard messages that are no longer relevant.

> I should say this is a very low message rate system, probably on
> the order of 10s per second at most, in fact, more likely low 100s
> per minute (though bursty).
>
> My questions (thanks for reading this far!) are:
>
> a) Is there any problem with having a moderate number of unacked
> messages outstanding on a single queue, for some extended period
> of time (minutes), and acknowledging them out of order of receipt
> (so that much later messages may be acked before much earlier
> messages)?

No. We've seen cases where 100s of thousands of messages were unacknowledged
for many hours or even days.

> b) Is this a reasonable way to implement the aggregation pattern
> given at-least-once semantics and persistent everything? 
> c) Is there a better way to do it?

I can't think of anything substantially different from your idea
right away.

> d) Would, for example, a custom exchange be a good idea? I'm thinking
> of something like the delayed message exchange except the release
> condition isn't satisfying the timeout but instead that the
> entire set of related messages has arrived - then all are released
> to downstream queues at once. (Therefore using mnesia as the
> persistent store for the held messages, which is fine.)

A custom exchange type may help here but keep in mind that you'll have to store
your state somewhere in the cluster. Mnesia is not necessarily a great choice
given that it is not capable of merging two divergent data sets. Riak may be
a better option depending on your needs.

Note that deploying new versions of a custom plugin will likely be a lot more painful
than your apps that use RabbitMQ.

HTH. 
--
MK

Staff Software Engineer, Pivotal/RabbitMQ


David Bakin

unread,
Jul 23, 2015, 12:47:30 AM7/23/15
to rabbitmq-users, davi...@ispot.tv, mkli...@pivotal.io
On Tuesday, July 21, 2015 at 4:06:36 AM UTC-7, Michael Klishin wrote:


A custom exchange type may help here but keep in mind that you'll have to store
your state somewhere in the cluster. Mnesia is not necessarily a great choice
given that it is not capable of merging two divergent data sets. Riak may be
a better option depending on your needs.

Note that deploying new versions of a custom plugin will likely be a lot more painful
than your apps that use RabbitMQ.



Thanks for the confirmation that my aggregator plan seems reasonable, and also for the tip that deploying a custom plugin may be something I'd like to avoid. -- David

 
Reply all
Reply to author
Forward
0 new messages