pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

141 views
Skip to first unread message

Amardeep Prajapati

unread,
Oct 15, 2024, 3:18:44 AM10/15/24
to Pika
I have different consumers for each queue and single rabbitMQ container to publish messages across different queue.
And in a process AMQP has published ~11lakh messages in queues but consumer container is getting killed after consuming a specific no of message (~7k)
Consumer Fails to consume large volume of messages.

pika==1.3.1 ;
python_version == "3.8"

Python code:
def __init__(self, queue: str = '',):
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue)
self.channel.basic_consume(queue=self.queue,
on_message_callback=self.callback,
auto_ack=True)
self.channel.start_consuming()


Consumer Logs:
Screenshot from 2024-10-15 12-42-07.png

RabbitMQ Logs:

Screenshot from 2024-10-15 12-40-46.png

Amardeep Prajapati

unread,
Oct 15, 2024, 3:26:38 AM10/15/24
to Pika
for establishing a connection:

try:
if connection and connection.is_open:
# self.logger.info(f"Connecting is open")
self.connection = connection
else:
# self.logger.info(f"Reconnecting mq connection")
self.connection = pika.BlockingConnection(pika.ConnectionParameters(self.msg_host,
port=settings.MESSAGE_QUEUE_PORT,
heartbeat=0))
except Exception as ex:
self.handle_exception(ex)

Luke Bakken

unread,
Nov 18, 2024, 10:48:42 AM11/18/24
to Pika
"Connection reset by peer" means that something is killing the TCP connection between your application and RabbitMQ.

royi reshef

unread,
Jan 7, 2025, 2:44:52 AMJan 7
to Pika

Upon reviewing your server logs, it's evident that the timeouts are originating from the server side.
The root cause is the use of BlockingConnection, which can lead to various issues in high-demand scenarios.
I recommend transitioning to an asyncio-based connection to enhance performance and reliability.

In the meantime, ensure you update the heartbeat settings directly on the server side, as adjusting the connection parameters alone is not suffice.

For long-running tasks with BlockingConnection, it's also essential to periodically call self.connection.sleep(0) to keep the heartbeat mechanism active.
This workaround is specific to blocking connections and will help maintain connection stability.

Luke Bakken

unread,
Jan 7, 2025, 11:20:55 AMJan 7
to Pika
>  For long-running tasks with BlockingConnection, it's also essential to periodically call self.connection.sleep(0) to keep the heartbeat mechanism active.

THIS IS NOT TRUE!

If you use BlockingConnection correctly, and do your per-message work on another thread, this is not necessary.

Royi Reshef

unread,
Jan 9, 2025, 5:02:18 AMJan 9
to pika-...@googlegroups.com
this is ture if your thread doing long running CPU bound operations and/or does not release the GIL
look here for example:

--
You received this message because you are subscribed to a topic in the Google Groups "Pika" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/pika-python/uYvutdGRGAs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to pika-python...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/pika-python/2a94f2c2-0b3e-4e9f-b02e-421f488d7207n%40googlegroups.com.

Luke Bakken

unread,
Jan 9, 2025, 10:34:21 AMJan 9
to Pika
this is ture if your thread doing long running CPU bound operations and/or does not release the GIL

If you are doing that in an application that uses Pika you are not using Pika correctly.
Reply all
Reply to author
Forward
0 new messages