ConnectionResetError when implementing an asynchronous consumer

5,045 views
Skip to first unread message

Nevermind

unread,
Dec 12, 2019, 10:50:44 AM12/12/19
to Pika
I'm using pika 1.1.0 with Python 3.6 and trying to implement an asynchronous consumer according to this example. My source code is attached to this post.

During the test, I got the following exception frequently. Although the program reconnected after that, I would like to know the main reason causing this issue and how can I handle it properly.

AMQP broker: RabbitMQ
Version: 3.7.8


File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 1041, in _on_socket_readable
   
self._consume()
 
File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 791, in _consume
    data
= self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES)
 
File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
   
return func(*args, **kwargs)
 
File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 846, in _sigint_safe_recv
   
return sock.recv(max_bytes)
ConnectionResetError: [Errno 104] Connection reset by peer



code.py

Luke Bakken

unread,
Dec 12, 2019, 10:52:59 AM12/12/19
to Pika
Hello,

Thanks for the information.

What is logged by RabbitMQ at the same time as this error?

"Connection reset by peer" is usually caused by network, firewall or load balancer issues. Is your Python application connecting to RabbitMQ via a load balancer, over the internet, etc?

Thanks,
Luke

Nevermind

unread,
Dec 12, 2019, 12:10:01 PM12/12/19
to Pika
In my case, I run 3 python consumers at the same time. The consumers and the RabbitMQ server are in a LAN network without load-balancers, proxies, etc

The corresponding log of RabbitMQ is as following

2019-12-12 23:52:54.238 [info] <0.29590.609> accepting AMQP connection <0.29590.609> (10.10.0.104:43180 -> 10.10.0.124:5672)
2019-12-12 23:52:54.596 [info] <0.29590.609> connection <0.29590.609> (10.10.0.104:43180 -> 10.10.0.124:5672): user 'my_app' authenticated and granted access to vhost 'my_app'
2019-12-12 23:53:40.313 [info] <0.29554.609> accepting AMQP connection <0.29554.609> (10.10.0.107:36144 -> 10.10.0.124:5672)
2019-12-12 23:53:40.321 [info] <0.29554.609> connection <0.29554.609> (10.10.0.107:36144 -> 10.10.0.124:5672): user 'my_app' authenticated and granted access to vhost 'my_app'
2019-12-12 23:55:54.599 [error] <0.29590.609> closing AMQP connection <0.29590.609> (10.10.0.104:43180 -> 10.10.0.124:5672):
missed heartbeats from client, timeout: 60s
2019-12-12 23:56:40.322 [error] <0.29554.609> closing AMQP connection <0.29554.609> (10.10.0.107:36144 -> 10.10.0.124:5672):
missed heartbeats from client, timeout: 60s
2019-12-12 23:57:14.249 [info] <0.29702.609> accepting AMQP connection <0.29702.609> (10.10.0.106:50570 -> 10.10.0.124:5672)



Luke Bakken

unread,
Dec 12, 2019, 12:26:25 PM12/12/19
to Pika
Hello,

This is the relevant error:

missed heartbeats from client, timeout:

When you say you run 3 python consumers at the same time, do you mean three separate Python processes?

Nevermind

unread,
Dec 12, 2019, 12:30:31 PM12/12/19
to Pika
Exactly. My program run as a distributed application, so I dockerized it and ran 3 containers separately.

Vào 00:26:25 UTC+7 Thứ Sáu, ngày 13 tháng 12 năm 2019, Luke Bakken đã viết:

Luke Bakken

unread,
Dec 12, 2019, 3:13:04 PM12/12/19
to Pika
Hi there,

So it seems as though there is probably a networking issue between your code and RabbitMQ. The code appears correct in that it's not blocking Pika's I/O loop, which is the usual cause of "missed heartbeat" errors.

I recommend running RabbitMQ and your code, outside of Docker, on the same machine, to prove that it works correctly.

Thanks,
Luke

Nevermind

unread,
Dec 12, 2019, 11:22:00 PM12/12/19
to Pika
Thanks for your suggestion, I will try it.

However, according to my observations, the exception appears when the task in the on_message() method takes a long time. In other cases, no exceptions appear. 
In the attached source code, I removed the real code and just left a print() line. Actually, this method contains conditional statements, depending on the message received, it has different processing instructions, and of course, their processing time is also different. Sorry if this made you misunderstand my application logic.

Moreover, my application is designed to run as a distributed application (many consumers run at the same time separately) and for each consumer, it listens to several certain queues. So, I'm not sure whether the asynchronous consumer is the best practice for this case or not. 

Can you take a look at my source code again and give me some advice? Thank you.

Luke Bakken

unread,
Dec 13, 2019, 12:13:56 PM12/13/19
to Pika
Hello,

Any work you do in a callback method block's Pika's I/O loop because it runs on the same thread. So, you must do your work on another thread, and correctly acknowledge the message when that work is done. The ioloop instance will have a add_callback_threadsafe method you can use in a manner similar to this:

Nevermind

unread,
Dec 14, 2019, 6:36:35 AM12/14/19
to Pika
Thank you. It worked for me.

Vào 00:13:56 UTC+7 Thứ Bảy, ngày 14 tháng 12 năm 2019, Luke Bakken đã viết:

Luke Bakken

unread,
Dec 14, 2019, 10:56:47 AM12/14/19
to Pika
Great! Thanks for letting us know.

Nevermind

unread,
Dec 30, 2019, 3:59:39 AM12/30/19
to Pika
I thought I was done but sadly, I was wrong.

I the last version, I just put the "worker" method into a thread but do not implement add_callback_threadsafe 
Now, the exceptions continue to appear, I tried to use add_callback_threadsafe but got errors "SelectConnection does not have add_callback_threadsafe method"



Vào 22:56:47 UTC+7 Thứ Bảy, ngày 14 tháng 12 năm 2019, Luke Bakken đã viết:

Nevermind

unread,
Dec 30, 2019, 5:09:17 AM12/30/19
to Pika
I used self._connection.ioloop.add_callback_threadsafe(cb) to resolve the calling method problem. However, exceptions continue to appear. Code is attached here

Besides, I got some warnings:

2019-12-30 09:49:50,499 PC WARNING WRITE indicated on fd=7, but writer callback is None; events=0b100


2019-12-30 10:04:32,360 PC ERROR _AsyncBaseTransport._consume() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=9, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('10.244.1.14', 43264)>; Caller's stack:
Traceback (most recent call last):

 File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 1041, in _on_socket_readable
   self._consume()
 File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 791, in _consume
   data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES)
 File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
   return func(*args, **kwargs)
 File "/usr/local/lib/python3.6/site-packages/pika/adapters/utils/io_services_utils.py", line 846, in _sigint_safe_recv
   return sock.recv(max_bytes)
ConnectionResetError: [Errno 104] Connection reset by peer
Traceback (most recent call last):
code.py

Luke Bakken

unread,
Dec 30, 2019, 10:37:51 AM12/30/19
to Pika
Hello -

Could you please provide RabbitMQ logs from the same time as these exceptions,

Thanks,
Luke

Nevermind

unread,
Dec 30, 2019, 10:44:54 AM12/30/19
to Pika
According to the logs, the problem is still missing heartbeats

2019-12-30 22:42:12.941 [info] <0.9233.998> accepting AMQP connection <0.9233.998> (10.10.0.153:44259 -> 10.10.0.124:5672)
2019-12-30 22:42:12.949 [info] <0.9233.998> connection <0.9233.998> (10.10.0.153:44259 -> 10.10.0.124:5672): user 'my_app' authenticated and granted access to vhost 'my_app'
2019-12-30 22:42:13.344 [error] <0.8315.998> closing AMQP connection <0.8315.998> (10.10.0.152:7799 -> 10.10.0.124:5672):

missed heartbeats
from client, timeout: 60s
2019-12-30 22:42:13.366 [info] <0.9199.998> accepting AMQP connection <0.9199.998> (10.10.0.152:5966 -> 10.10.0.124:5672)
2019-12-30 22:42:13.375 [info] <0.9199.998> connection <0.9199.998> (10.10.0.152:5966 -> 10.10.0.124:5672): user 'my_app' authenticated and granted access to vhost 'my_app'
2019-12-30 22:42:13.522 [error] <0.8329.998> closing AMQP connection <0.8329.998> (10.10.0.154:44512 -> 10.10.0.124:5672):

Luke Bakken

unread,
Dec 30, 2019, 10:48:33 AM12/30/19
to Pika
Thank you for confirming that. I'm looking at the code now.

Luke Bakken

unread,
Dec 30, 2019, 5:54:19 PM12/30/19
to Pika
Hello,

You appear to be doing your work and acknowledging it correctly via another thread.

What does the run_plugin method do? It will block Pika when executed.

Thanks,
Luke

Nevermind

unread,
Dec 30, 2019, 10:50:34 PM12/30/19
to Pika
Run_plugin() is called by my working() method and is the main job of mine.
How did it block the IO loop?

Luke Bakken

unread,
Dec 31, 2019, 12:48:40 AM12/31/19
to Pika
Hello,

Nothing in the code you shared actually calls run_plugin, which is why I'm a bit confused.

Is code.py the code you're actually using when you see this issue?

Thanks,
Luke

Nevermind

unread,
Dec 31, 2019, 12:54:52 AM12/31/19
to Pika
Hello,

For privacy reasons, I've changed a little bit. Basically, the working() method does some stuff and call the run_plugin() method. 

Everything else is unchanged. However, I don't think the root cause is in working() or run_plugin() method.

Vào 12:48:40 UTC+7 Thứ Ba, ngày 31 tháng 12 năm 2019, Luke Bakken đã viết:

Luke Bakken

unread,
Dec 31, 2019, 12:31:56 PM12/31/19
to Pika
Hello,


There are no missed heartbeat errors because the long-running task is on another thread. There isn't an issue with Pika ... something in your code that you haven't shared is causing the issue, or something in your environment.

If you replace your actual work with a time.sleep call, do you still see the issue? Can you run with DEBUG logging?

Thanks -
Luke

Nevermind

unread,
Dec 31, 2019, 12:36:02 PM12/31/19
to Pika
Thank you. I'm looking at your code and will give feedback soon.

Vào 00:31:56 UTC+7 Thứ Tư, ngày 01 tháng 1 năm 2020, Luke Bakken đã viết:

Assel Aliyeva

unread,
Jan 2, 2020, 8:30:07 PM1/2/20
to Pika
Im facing the same error. please let me know if you found the solution 

Alexander Veysov

unread,
Jan 7, 2020, 1:09:40 AM1/7/20
to Pika
Hi guys,

I am facing a similar issue.

In my case all the symptoms are really similar, the only differences being:

- I base my code on this example, but I process my messages in batches (I run a GPU workload where batching is optimal and highly desired)
- When a batch is full I just process it, but when it is not, I just create a Timer object that would process a smaller batch shortly
- I use it for an RPC workload (but that does not really change anything I guess?)

Am I doing something wrong?

Any pointers appreciated!
My consumer code is attached
async_batch_consumer.py

Alexander Veysov

unread,
Jan 7, 2020, 2:22:57 AM1/7/20
to Pika
Do not want to celebrate prematurely, but it looks like implementing key ideas from this example https://gist.github.com/lukebakken/765672f0b68cdc812102a44008c9923b into my RPC client solved the issue!

Many thanks to Luke Bakken for his examples and feedback in this thread

Anyway key changes that I made are the folllowing:

- replace my "do_work" calls with a threaded version of the call

            thread = threading.Thread(target=self.process)
            thread.start()

- acknowledge messages via `self._connection.ioloop.add_callback_threadsafe`

                cb = functools.partial(self.acknowledge_message, delivery_tag)
                self._connection.ioloop.add_callback_threadsafe(cb)


- send back the RPC responses via  `self._connection.ioloop.add_callback_threadsafe` as well

                cb = functools.partial(self.rpc_response, ch, props, result)
                self._connection.ioloop.add_callback_threadsafe(cb)


Hope this helps someone!
Using RPC, async consumers with batching and also aio-pika RPC code for publishing turned out more complex than I expected!



Nevermind

unread,
Jan 7, 2020, 4:02:33 AM1/7/20
to Pika
I totally agree with Alexander Veysov.

In my do_work() method, I called self._channel.basic_reject(delivery_tag) in some cases without wrapping by a thread-safe callback. Maybe this blocked the heartbeat signal.

I tried to wrap it as below and do not see the exception for the time being

def safe_reject(self, delivery_tag):
cb = functools.partial(self.reject_message, delivery_tag)
self._connection.ioloop.add_callback_threadsafe(cb)

Hopefully, it works. Many thanks to Luke Bakken for his answers.


Vào 14:22:57 UTC+7 Thứ Ba, ngày 07 tháng 1 năm 2020, Alexander Veysov đã viết:

Vitaly Krug

unread,
Jan 7, 2020, 10:36:19 AM1/7/20
to pika-...@googlegroups.com
Doing non-threadsafe things to a connection/channel will trigger race conditions which lead to bad things

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pika-python/08847cbb-b89b-4089-905a-e61ef91f5291%40googlegroups.com.

Alexander Veysov

unread,
Jan 9, 2020, 3:30:12 AM1/9/20
to Pika
Tested my code for a bit more, so far it seems stable!
Nothing Eath shattering there, but maybe let's make a public example for this folder https://github.com/pika/pika/tree/master/examples out of this?

I faced the following hurdles in my work with RabbitMQ / pika:

- Almost zero information on batching messages properly
- Scattered information on this topic
- Very scattered information on how to combine RPC with async consumers / batches / custom events 

So, Luke, maybe to facilitate the proper use of pike by the next wave of users, you could maybe take my example, make it more proper and publish in the examples folder in the main repo?
I took the liberty of using a more modern library for logging, but otherwise the example is just based on asynchronous_consumer_example.py.
Note that I used proper threadsafe stuff only for the RPC flag in my code, because in other cases performance does not really matter (but that can be easily rewritten).


async_batch_consumer.py

Luke Bakken

unread,
Feb 6, 2020, 11:14:05 AM2/6/20
to Pika
Hello,

If you have time, open a pull request with your suggestion. I am busy working on higher-priority RabbitMQ items at this time.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages