StreamLostError when publishing messages

82 views
Skip to first unread message

Pradyumna Mahajan

unread,
May 21, 2024, 1:27:29 PMMay 21
to Pika
Hi Team,
I am creating a microservice which is going to send messages to a queue, for which I am making a common rabbitmq library. I am facing an issue where if there is no message sent for sometime, when the next message is sent, I get the following issue -

pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

From what I have read, I understand that the server might close idle connections so I have implemented my code in such a way that it retries the connection if the connection is closed as such -

    def get_rabbitmq_connection(self) -> pika.BlockingConnection:

        should_make_new_connection = (self._rabbitmq_connection is None or self._rabbitmq_connection.is_closed)
        if should_make_new_connection:
            logging.info("Connection was empty or closed. Creating new connection")
            self._rabbitmq_connection = self._get_new_rabbitmq_connection()

        return self._rabbitmq_connection

But in actuality, this does not work all the time. There is one message it tries to send and fails in sending (resulting in the StreamLostError) before actually sending the message the next time it is called. When I checked the server logs, this message seems to be common -

  • 2024-05-17 21:49:15.398578+05:30 [error] <0.19818.28> missed heartbeats from client, timeout: 60s
  • 2024-05-17 21:49:15.399243+05:30 [debug] <0.20373.28> Closing all channels from connection '10.20.x.x:33152 -> 10.20.x.x:5672' because it has been closed

Should I just use the recover option used in https://pika.readthedocs.io/en/stable/modules/adapters/index.html#connection-recovery ? that is retry on AMQPConnectionError Error? in that case, why should channel errors not be handled? Can channels not be closed in the connection is intact?

Please help me understand what am I doing wrong
Any help would be greatly appreciated!

Paul Lam

unread,
May 21, 2024, 2:03:32 PMMay 21
to Pika
Good day.  I experienced same if not similar issue before.  
Haven't tried the recover options, though as of the docs it seems like the connection errors are still thrown, as missing heartbeat issue still seems to persist there.

The best solution is either to find ways to send heartbeat back to the rabbitmq server, or to NACK and requeue message at a certain timeframe (heartbeat would be sent to server as well this way)

There are two approaches to resolve this issue:
- If the heartbeat timeout is known and consistent: pika.BlockingConnection(pika.ConnectionParameters(rabbit_server, heartbeat=SECONDS_TILL_HEARTBEAT_TIMEOUT))
- If the heartbeat timeout isn't known and consistent, then use SIGALRM and signal.ITIMER_REAL to set an interval to send heartbeat every i seconds.  When i <= SECONDS_TILL_HEARTBEAT_TIMEOUT - 10 seconds or so, call:
   self.blocking_connection.process_data_events()
to send the heartbeat to the the rabbitmq server manually.

Note that: process_data_events is only available in BlockingConnection and not any other connection types.   For other connection types you may need to figure out how to detect timeout in other services and use that to send NACK to potentially requeue messages.

Have been stumbling on this for a decade and managed to resolved it.  Hope that helps!  Good luck and have a nice day.

Best,
Paul

Paul Lam

unread,
May 21, 2024, 2:07:33 PMMay 21
to Pika
Whoops referring to the post above, I mean 'When i >= SECONDS_TILL_HEARTBEAT_TIMEOUT - 10 seconds' instead

Pradyumna Mahajan

unread,
May 22, 2024, 12:55:32 AMMay 22
to Pika
I see, I have not set the heartbeat, which means it should be the one that is proposed by the server right? That should be fine?

"The best solution is either to find ways to send heartbeat back to the rabbit mq server," So I will have to do the threading approach right? Which process data events? And I will have to have a thread safe callback too right?

Paul Lam

unread,
May 22, 2024, 5:59:46 PMMay 22
to Pika
If the heartbeat is not set, as of this article: https://www.rabbitmq.com/docs/heartbeats, the default heartbeat seems to be 60.
As of newer version > 0.8.x for pika, pika heartbeat set in client side overrides the server side heartbeat (if set).

As of threading: No, do not even attempt threading unless you have already tried everything you can (e.g., call blocking_connection.process_data_event, sending NACK to requeue, etc.) and you are sure that it still does not work.  I made this mistake and it ended up adding unnecessary complexities to my application, and it wasted lots of my time.
Threading is the last resort, considering that some services/packages are not thread-safe, and prone to race conditions which requires management of different threads.  Hence, why not just use simpler solution that is as just as robust instead.

BlockingConnection.process_data_event is a function in pika.  It ensure data events are processed and therefore notifies the server and telling the server that it's still alive, which means by calling BlockingConnection.process_data_event periodically before every heartbeat timeout, the heartbeat is sent back to the server, hence this is probably one of the best solutions to my understanding.  I would recommend you to do some digging in their code to understand further, I would say online documentation itself is insufficient.  Perhaps, git clone the pika repository, and search for process_data_events if you would like to read more on what it does (there are lots of nice commenting there).

If thread is not used, then there's no point of using thread safe callback.  But if you are using threads then yes.

Good luck!  Hope this helps.  Use your judgement before proceeding with the complexities of multi-threading.


Reply all
Reply to author
Forward
0 new messages