Why does the pika connection get dropped when publishing at irregular intervals

863 views
Skip to first unread message

Ionuț-Alexandru Baltariu

unread,
Sep 9, 2022, 10:46:15 AM9/9/22
to rabbitmq-users
The scenario is the following: I receive messages on one queue, do a bit of processing and then send a message on another queue.

```
credentials = PlainCredentials("test", "test")
publisher_credentials = PlainCredentials("test", "test")
connection = BlockingConnection(ConnectionParameters("host1", 1234, "/", credentials))
publisher_connection = BlockingConnection(ConnectionParameters("host2", 1234, "/", publisher_credentials))

channel, publisher_channel = connection.channel(), publisher_connection.channel()
publisher_channel.queue_declare(queue="testqueue", passive=True)
publisher_channel.confirm_delivery()
callback_fct = generate_callback(publisher_channel)

channel.basic_consume(queue=os.getenv("RABBIT_MQ_QNAME"), on_message_callback=callback_fct, auto_ack=True)

try:
    channel.start_consuming()
except KeyboardInterrupt:
    channel.stop_consuming()
    connection.close()
except Exception as e:
    logger.exception("An unexpected error has occurred!")

```
And the `generate_callback` function would do something like this:
```
def generate_callback(publisher):
    def on_message(channel, method_frame, header_frame, body):
        logger.debug(f"Received {body}")
        # assume some processing is done here, it should be really fast (under one second)
        publisher.basic_publish(exchange='', "test", body="random_string", properties=BasicProperties(content_type='text/plain', delivery_mode=DeliveryMode.Persistent))

    return on_message
```
Publishing works, but if I do not receive a message in my consumer queue for a couple of minutes, it seems that the publisher connection is lost:

```
ERROR - Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
```

I do not understand what I have to do in order to prevent the connection from being lost. In my current implementation I am automatically recreating the connection, but I would want to prevent this, at least in the cases in which nothing is received for a couple of minutes. What am I missing?

Luke Bakken

unread,
Sep 10, 2022, 9:13:12 PM9/10/22
to rabbitmq-users
Hello,

You are using two instances of BlockingConnection on the same thread. They are guaranteed to interfere with each other.

Move them to separate Python threads.

You can use the same connection, but be warned that if RabbitMQ blocks your publisher it means that your consumer may not be able to ack messages, either.

Ionuț-Alexandru Baltariu

unread,
Sep 11, 2022, 1:38:03 AM9/11/22
to rabbitmq-users
Hello, thanks for the answer!

Oh, I see.. In the current situation I don't really visualise how should I structure the code so that both connections are used in different threads.. Because publishing a message really depends on firstly receiving one.
I assume that creating threads for every received message is not a solution, because I understood that you mean that even the connection object instantiation has to be done on a separate thread.

Using same connection cannot work, since they are separate RabbitMQ brokers. And I assume that using a SelectConnection for publishing messages wouldn't be that easy either.

I haven't seen any official examples in the pika repository with this kind of situation - is it that rare? Do people just use the same broker in production? Wouldn't that cause some messages to just be lost?

Luke Bakken

unread,
Sep 11, 2022, 10:18:20 AM9/11/22
to rabbitmq-users
Hello,


 I don't really visualise how should I structure the code so that both connections are used in different threads.. Because publishing a message really depends on firstly receiving one.

You start two threads, each sets up its own BlockingConnection. Thread 1 connects to server A and begins consuming messages.
Thread 2 (publishing thread) connects to server B and  uses process_data_events to keep the thread alive (https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingConnection.process_data_events).

Then, when Thread 1 receives a message, it calls a method on Thread 2 that will call add_callback_threadsafe on the connection in Thread 2 to schedule a function to publish the message:


You should start work on your code and share it via GitHub. I can assist you.

I haven't seen any official examples in the pika repository with this kind of situation - is it that rare? Do people just use the same broker in production? Wouldn't that cause some messages to just be lost?

Everyone's use-case is different. Consuming from one broker and publishing to another is not as common as a single broker.

Multiple message brokers are no more or less likely to lose messages. You must code your application correctly to prevent lost messages by using publisher confirmations and delivery confirmations correctly.

Thanks,
Luke 

Ionuț-Alexandru Baltariu

unread,
Sep 13, 2022, 7:20:48 AM9/13/22
to rabbitmq-users
Hello!

I have abandoned the idea of two different brokers and started developing the same features for a single broker.
Is the best practice having a channel for publishing and another one for consuming on the same BlockingConnection, running on the main thread?

Luke Bakken

unread,
Sep 13, 2022, 8:26:10 AM9/13/22
to rabbitmq-users
I have abandoned the idea of two different brokers and started developing the same features for a single broker.
Is the best practice having a channel for publishing and another one for consuming on the same BlockingConnection, running on the main thread?

No, you really should publish and consume using different connections. The reason is that if you are using the same connection, and RabbitMQ decides to throttle your publisher, it will slow down or stop reading from that TCP port. This also means that your consumer is effectively blocked because RabbitMQ won't be reading the consumer acks.

If you're message rates are low and you are unlikely to hit RabbitMQ alarms, then go right ahead and use the same connection. Just be aware of this situation.

Again, I'm happy to help out with a multi-threaded solution because it could be later incorporated into the examples provided in the Pika github repository.

Thanks,
Luke
Reply all
Reply to author
Forward
0 new messages