Deleting messages from a stream

464 views
Skip to first unread message

Andrea Frome

unread,
Jun 26, 2023, 10:54:10 AM6/26/23
to rabbitm...@googlegroups.com
We have a use case for which a stream sounds like the right solution (e.g. based on feedback in this thread), but the strictly non-destructive semantics of streams is an issue for us. Here's the scenario...

Customers of our platform send in bursty REST requests that are published to RMQ, and we have two consumers: one that processes and loads the request into our Postgres database and one that sends it to elasticsearch. We need REST requests of various types to be reliably processed in the order they were sent by the customer, and we are relying on a single queue per consumer or a single stream to enable us to serialize the requests for processing. We need to consume every message that is published to the queue -- i.e. we don't want to drop messages if the queue gets too large. We need durability and redundancy (quorum).

We could find ourselves with backlogs of 10M+ messages at a time in both queues / the stream when the processing and loading for Postgres is slower than bursty REST requests. We expect the consumers to catch up when the REST requests slow (and the consumers will be optimized to prefetch and group consecutive messages of the same type for bulk upsert).

Our use case appears to fit streams well EXCEPT for the fact that the only way things get deleted from the stream is if the stream exceeds size or age limits. We will set a high threshold on both of these things that we don't expect to exceed because we should not drop these messages (TBs of disk if needed). However, that means that a stream would ALWAYS take up that much disk space after it initially reaches it, and that could get expensive.

What we want is to be able to mark messages in a stream as "fully consumed" and have RMQ delete from disk chunks when all the messages in the chunk are fully consumed (as opposed to deleting them based on queue size or age limits). This way we would have stream scale and semantics but without a piece of infrastructure that permanently takes up TBs of disk space in multiple data centers.

Is this something that you all have considered? Is there a way to get that behavior with the core or plug-in stream implementations (we're in Python)? Thanks!

Karl Nilsson

unread,
Jun 26, 2023, 3:53:50 PM6/26/23
to rabbitm...@googlegroups.com
No. Streams, as you have noted converge towards the “full” state whereas queues converge towards the “empty” state. Those are fundamental differences between the two and to start trying to maintain records of which messages have been processed isn’t something we feel be a good way to approach things. 

You can dynamically change the retention settings in response to consumer progress but apart from that we have no immediate suggestions. 

Of course if someone comes around with a good idea or designs around refining how stream retention is evaluated we’re interest in that. 

Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/CACjixsz3n800aHpRqwYrMYV-Xp-RD%3D6p_Ncj%3DQaVnY-qf_RFtw%40mail.gmail.com.
--
Karl Nilsson

Artur Wroblewski

unread,
Jun 26, 2023, 7:15:36 PM6/26/23
to rabbitm...@googlegroups.com
On Mon, Jun 26, 2023 at 07:53:47AM -0700, Andrea Frome wrote:
[...]
> Our use case appears to fit streams well EXCEPT for the fact that the only
> way things get deleted from the stream is if the stream exceeds size or age
> limits.

Do you have any time limit for the requests sent by your customers, after
which the requests are considered to be lost, and need to be resent?

[...]

Best regards,

Artur

--
https://mortgage.diy-labs.eu/

Andrea Frome

unread,
Jun 26, 2023, 7:23:46 PM6/26/23
to rabbitmq-users
Thanks for the reply, Karl. The proposal I would make is to have a "marked for deletion" state that is explicitly requested by the client (i.e. the server wouldn't track which consumers have processed a message), and the broker would garbage collect marked items. It is true we are always trying to get to an empty state, but the reality is that we expect to often (if not nearly always) have messages in the stream/queue. From what I've read on this list, we may run into issues with large quorum queues and streams are often cited as the answer.

kjnilsson

unread,
Jun 27, 2023, 4:30:25 AM6/27/23
to rabbitmq-users
So something like a "set lowest required offset" type of API and the stream can use that offset and delete any segments lower than that offset?

It's not beyond the realm of possibilities but the issue with any such functionality is: what if the lowest required offset is never set? Should the stream grow infinitely then or do we expect there to be anther limit (size, age) also?

Andrea Frome

unread,
Jun 28, 2023, 1:41:59 PM6/28/23
to rabbitm...@googlegroups.com
I would expect the size/age limit to also apply. In our case we'll set the size/age high enough that we don't expect to hit it (and something has gone horribly wrong if we have).

You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/nH4YFDc87Tw/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/12fe67dc-3cda-4a29-b8b9-8de023bcc372n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages