TL;DR - When starting an AMQP consumer for a stream queue in a rabbit cluster, it hangs indefinitely when reaching the end of the stream.
We made a simple application using python kombu that consumes from a RabbitMQ stream that has a lot of messages (for example 10k) in it. However, we encountered an issue where the consumer hangs when reaching the last message, and upon resuming the production of messages, the consumer will not continue receiving them until it is restarted.Β
We've managed to figure out that:
* this only happens when consuming from a cluster. When there is only a single rabbit node there is no hang.
* this only happens when using the AMQP wrapper. Using the stream plugin, there is no hang.
* This problem also occurs when using the rust client (lapin), so it is not client-specific.
To simulate the problem, simply create a stream and:
- start publisher, stop it after a few seconds
- start consumer and wait for it to reach the end of the stream
- start publisher again
- the consumer should not receive those new messages as they arrive.
Here is our consumer:
```
from kombu import Connection, Exchange, Consumer, QueueΒ # noqa
def callback(body, message):
Β Β print(message)
Β Β message.ack()
with Connection('amqp://rabbitmq:rabbitmq@rabbit:5672//') as conn:
Β Β with Consumer(conn, queues=[Queue('predictions', exchange=Exchange('predictions', type='fanout', durable=False), durable=True, queue_arguments={'x-queue-type':'stream'}, consumer_arguments={'x-stream-offset': 0})], prefetch_count=1000, callbacks=[callback]):
Β Β Β Β while True:
Β Β Β Β Β Β conn.drain_events()
```