To ACK a message on a different thread, how to do it correctly?

536 views
Skip to first unread message

Witold Szczerba

unread,
Jan 5, 2023, 9:25:45 AM1/5/23
to rabbitmq-users
Hi,
Using RMQ for many years, so far messages are consumed from a queue and ACKed one-by-one.
Now I need something else:
  • a queue captures messages at high frequency
  • consumer would take a messages, prepare it for HTTP transport and put in some "collection" (a map, dictionary, list, whatever)
  • when the "collection" is full or if it's collecting messages for too long, I would like to send everything to the remote service, await 200 OK response, clear the buffer and ACK all the messages
  • it's important to do it all in the right order as I do not want to loose any message due to e.g. service restart or some failures, so ACK only after 200 OK
  • I do not care if remote service receives a duplicate
So, what I would do is:
  • create a channel for message consumer, prefetchCount=200, autoAck=false
  • consume messages synchronously with no IO operations (this is the part where I do prepare a message for HTTP transport and store it in some collection)
  • at this point I do not ACK, but consuming single message is over, so the consumer will get another one from RMQ
  • another thread will peek the collection every now and the, eventually sends all the prepared data over HTTP, await for 200 OK and... HERE IS THE QUESTION
Looking at https://www.rabbitmq.com/dotnet-api-guide.html#concurrency (I am coding in F#, so using dotnet API, but it should apply to other libraries as well).

According to the aforementioned docs I should avoid sharing the channel (aka IModel in dotnet API) but my use case requires a separate background thread to look after the buffered messages and to ACK messages, so RMQ would know it is safe to clear the queue.

I know I can synchronize all the usage of the channel/IModel from my code, but the same channel is used by the `channel.BasicConsume` and I have no control over what and when  RabbitMQ does with that channel.

The docs mention this:
> Channel instances must not be shared by threads that publish on them.

So as far as I can see, if my background thread is the only one publishing ACKs on a channel (and I do not use the channel for other activities) – it is not a problem the another thread (managed by RMQ) is using same channel to receive the messages, right?

Regards,
Witold Szczerba

Luke Bakken

unread,
Jan 5, 2023, 9:49:27 AM1/5/23
to rabbitmq-users
Hi Witold,

You should definitely be using RabbitMQ's multi-ack feature.

So, what I would do is:
  • create a channel for message consumer, prefetchCount=200, autoAck=false
  • consume messages synchronously with no IO operations (this is the part where I do prepare a message for HTTP transport and store it in some collection)
  • at this point I do not ACK, but consuming single message is over, so the consumer will get another one from RMQ
  • another thread will peek the collection every now and the, eventually sends all the prepared data over HTTP, await for 200 OK and... HERE IS THE QUESTION
When your consumer gets 200 messages, it will then send this batch off to another thread for work to be done. After the work is done, the worker thread could do one of these things:
  • Multi-ack the batch using the same IModel instance on the worker thread. I'm pretty sure this will always work based on re-reading the doc to which you linked.
  • Set a shared variable in your consumer code indicating the last delivery tag (or tags) from the batch (or batches) "to be acked". Then a multi-ack could be sent when a new batch of messages comes in. This feels hacky enough that my first option must be OK. Plus, if messages ever stop, you'll probably have orphaned acks unless you set a timer to check the variable. Ugh.
The Python client (Pika) has a means to ensure work is scheduled on the correct thread so I'm wondering if a future version of the .NET client should offer the same feature. I'll make a note of this.

Thanks,
Luke

Witold Szczerba

unread,
Jan 5, 2023, 3:12:56 PM1/5/23
to rabbitmq-users
My description was a little bit simplified, but I think I get the idea. The messages captured by a queue are meant to be send to (possibly) many different customers, so each one message – when consumed is (almost) instantly put in a bucked destined for a specific customer and this is where queue consumer ends its job, leaving ACK the message for someone else in the future. Using `prefetchCount` we can fine tune the processing pace.

Simultaneously, a separate background thread checks the buckets and decides if this is the right time to flush the messages for each bucked separately. It is then possible for messages 1..10 and 50..100 fire first, while 21..49 and 101..200 wait a little bit longer while new messages keep coming.

This is when it ACKs becomes tricky. I would love to use channel.BasicAck with `multiple=true` but it works only if all the messages within a range are ready to go. I will probably implement something like a "smartAck", so the ACKing function will see through all the delivert tags trying to find a way through `multiple=true` and falls back to individual ACKs when needed.

I think it would be great if instead of `multiple=true` we could just pass the collection of delivery tags to be ACKed and the driver would find its way to optimize the process. Not complaining though, the RMQ server and client libraries are nothing but a great experience for us since 2015, and we use NodeJS, Python, JDK and most recently dotnet/F# (since Core 3.0) clients, all with almost no issues.

Thanks,
Witold Szczerba
Reply all
Reply to author
Forward
0 new messages