Stream consumer hangs when running out of messages

442 views
Skip to first unread message

Raz Hemo

unread,
Oct 5, 2021, 7:01:09β€―AM10/5/21
to rabbitmq-users
Hello,
TL;DR - When starting an AMQP consumer for a stream queue in a rabbit cluster, it hangs indefinitely when reaching the end of the stream.

We made a simple application using python kombu that consumes from a RabbitMQ stream that has a lot of messages (for example 10k) in it. However, we encountered an issue where the consumer hangs when reaching the last message, and upon resuming the production of messages, the consumer will not continue receiving them until it is restarted.Β 

We've managed to figure out that:
* this only happens when consuming from a cluster. When there is only a single rabbit node there is no hang.
* this only happens when using the AMQP wrapper. Using the stream plugin, there is no hang.
* This problem also occurs when using the rust client (lapin), so it is not client-specific.

To simulate the problem, simply create a stream and:
- start publisher, stop it after a few seconds
- start consumer and wait for it to reach the end of the stream
- start publisher again
- the consumer should not receive those new messages as they arrive.

Here is our consumer:
```
from kombu import Connection, Exchange, Consumer, QueueΒ  # noqa

def callback(body, message):
Β  Β  print(message)
Β  Β  message.ack()

with Connection('amqp://rabbitmq:rabbitmq@rabbit:5672//') as conn:
Β  Β  with Consumer(conn, queues=[Queue('predictions', exchange=Exchange('predictions', type='fanout', durable=False), durable=True, queue_arguments={'x-queue-type':'stream'}, consumer_arguments={'x-stream-offset': 0})], prefetch_count=1000, callbacks=[callback]):
Β  Β  Β  Β  while True:
Β  Β  Β  Β  Β  Β  conn.drain_events()
```

Gabriele Santomaggio

unread,
Oct 7, 2021, 6:01:22β€―AM10/7/21
to rabbitmq-users
We can't reproduce the issue.

here is the code, publisher:
```
from kombu import Connection, Exchange, Consumer, QueueΒ  # noqa

with Connection('amqp://test:test@localhost//') as conn:
Β  Β  # produce
Β  Β  producer = conn.Producer(serializer='json')
Β  Β  for i in range (1,10000):
Β  Β  Β  producer.publish( {'hello': 'world'},
Β  Β  Β  Β  Β  Β  Β  Β  Β  Β  Β  Β  exchange="predictions")
```

consumer:
```
from kombu import Connection, Exchange, Consumer, Queue, eventloopΒ  # noqa

def callback(body, message):
Β  Β  print(message)
Β  Β  message.ack()

with Connection('amqp://test:test@localhost:5672//') as conn:
Β  Β  with Consumer(conn, queues=[Queue('my-test',Β 
Β  Β  Β  Β  exchange=Exchange('predictions', type='fanout', durable=False), durable=True,Β 
Β  Β  Β  Β  queue_arguments={'x-queue-type':'stream'},Β 
Β  Β  Β  Β  consumer_arguments={'x-stream-offset': 0})],Β 
Β  Β  Β  Β  prefetch_count=1000, callbacks=[callback]):
Β  Β  Β  Β  for _ in eventloop(conn):
Β  Β  Β  Β  Β  Β  pass
```

Can you post the stream status like:
```
rabbitmq-streams stream_status predictions
```

and also the logs to see if there are errors.

Cheers
-
Gabriele

Raz Hemo

unread,
Oct 7, 2021, 8:45:00β€―AM10/7/21
to rabbitmq-users
Just to make sure, are you trying to reproduce the issue on a cluster? As I said, there is no problem with only one node. The problem occurs only on a cluster.

Anyway, I didn't see any logs on the server, and this is the output of stream_status:
Status of stream predictions on node rabbitmq@labbeast1 ...
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ roleΒ  Β  β”‚ nodeΒ  Β  Β  Β  Β  Β  Β  Β  β”‚ offset β”‚ committed_offset β”‚ first_offset β”‚ readers β”‚ segments β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ writerΒ  β”‚ rabbitmq@beast29Β  Β  β”‚ 60541Β  β”‚ 60541Β  Β  Β  Β  Β  Β  β”‚ 0Β  Β  Β  Β  Β  Β  β”‚ 2Β  Β  Β  Β β”‚ 1Β  Β  Β  Β  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ replica β”‚ rabbitmq@labbeast1Β  β”‚ 60541Β  β”‚ 60541Β  Β  Β  Β  Β  Β  β”‚ 0Β  Β  Β  Β  Β  Β  β”‚ 0Β  Β  Β  Β β”‚ 1Β  Β  Β  Β  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ replica β”‚ rabbitmq@labthanos1 β”‚ 60541Β  β”‚ 60541Β  Β  Β  Β  Β  Β  β”‚ 0Β  Β  Β  Β  Β  Β  β”‚ 1Β  Β  Β  Β β”‚ 1Β  Β  Β  Β  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The reader I started this time got stuck after ~150k messages were in the stream. At this point the producer was paused. I restarted the producer and the consumer did not advance. Only after restarting the consumer it could read further along.
I will note that this doesnt reproduce 100% of the time, it sometimes takes some finicking and several restarts of both the producer and consumer.

Raz Hemo

unread,
Oct 7, 2021, 8:47:17β€―AM10/7/21
to rabbitmq-users
Sorry, i copied the wrong output, this is the output of stream_status at the correct moment in time where the hang occurred:

Status of stream predictions on node rabbitmq@labbeast1 ...
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ roleΒ  Β  β”‚ nodeΒ  Β  Β  Β  Β  Β  Β  Β  β”‚ offset β”‚ committed_offset β”‚ first_offset β”‚ readers β”‚ segments β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ writerΒ  β”‚ rabbitmq@beast29Β  Β  β”‚ 277050 β”‚ 277050Β  Β  Β  Β  Β  Β β”‚ 0Β  Β  Β  Β  Β  Β  β”‚ 2Β  Β  Β  Β β”‚ 1Β  Β  Β  Β  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ replica β”‚ rabbitmq@labbeast1Β  β”‚ 277050 β”‚ 277050Β  Β  Β  Β  Β  Β β”‚ 0Β  Β  Β  Β  Β  Β  β”‚ 1Β  Β  Β  Β β”‚ 1Β  Β  Β  Β  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ replica β”‚ rabbitmq@labthanos1 β”‚ 277050 β”‚ 277050Β  Β  Β  Β  Β  Β β”‚ 0Β  Β  Β  Β  Β  Β  β”‚ 0Β  Β  Β  Β β”‚ 1Β  Β  Β  Β  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Karl Nilsson

unread,
Oct 7, 2021, 9:21:22β€―AM10/7/21
to rabbitmq-users
This looks like a bug. Looking at the code I can see something that doesn't look quite right. If I create a branch with a few changes would you be able run your tests against the docker image artifact that gets created?

Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/fb053907-77e5-4e16-b029-2e3143716ecfn%40googlegroups.com.


--
Karl Nilsson

Raz Hemo

unread,
Oct 7, 2021, 9:32:41β€―AM10/7/21
to rabbitmq-users
yes, i'd be able to do that. Thanks :)
out of curiosity, i'd be happy to know what happened

Karl Nilsson

unread,
Oct 7, 2021, 10:46:21β€―AM10/7/21
to rabbitmq-users
https://hub.docker.com/layers/pivotalrabbitmq/rabbitmq/stream-queue-consumer-fix-otp-max/images/sha256-59b33eb5ca476e57bea72962c4b4138b79499c2b7fc20a0b934a92371cf9010d?context=explore

I don't know for sure this will fix it but it was doingΒ something internally that crossed rabbit nodes when it didn't need to. Theoretically that could possibly trigger some unexpected race conditions. Either way it is a good change as it avoid cross node communication for common usage patterns.Β 



--
Karl Nilsson

Raz Hemo

unread,
Oct 10, 2021, 4:35:02β€―AM10/10/21
to rabbitmq-users
Hi,
So far I'm trying to reproduce with your image and it seems like it no longer hangs. It does sometimes give me a connection refused when initializing, though, but atleast that's a more easily manageable problem. :)

Raz Hemo

unread,
Oct 10, 2021, 4:39:56β€―AM10/10/21
to rabbitmq-users
Im not sure if the connection refused is a problem with rabbitmq as it could be something on our end. Either way, i'm happy about the fix :)

Karl Nilsson

unread,
Oct 10, 2021, 4:58:58β€―AM10/10/21
to rabbitm...@googlegroups.com
Great thanks for testing!

--
Karl Nilsson

Gabriele Santomaggio

unread,
Oct 12, 2021, 9:38:14β€―AM10/12/21
to rabbitmq-users
Raz,
Fixed here:Β https://github.com/rabbitmq/rabbitmq-server/pull/3550Β 
It will be in the next RabbitMQ release.

Cheers
-
GabrieleΒ 


Reply all
Reply to author
Forward
0 new messages