Ionuț-Alexandru Baltariu
unread,Sep 9, 2022, 10:46:15 AM9/9/22Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to rabbitmq-users
The scenario is the following: I receive messages on one queue, do a bit of processing and then send a message on another queue.
```
credentials = PlainCredentials("test", "test")
publisher_credentials = PlainCredentials("test", "test")
connection = BlockingConnection(ConnectionParameters("host1", 1234, "/", credentials))
publisher_connection = BlockingConnection(ConnectionParameters("host2", 1234, "/", publisher_credentials))
channel, publisher_channel = connection.channel(), publisher_connection.channel()
publisher_channel.queue_declare(queue="testqueue", passive=True)
publisher_channel.confirm_delivery()
callback_fct = generate_callback(publisher_channel)
channel.basic_consume(queue=os.getenv("RABBIT_MQ_QNAME"), on_message_callback=callback_fct, auto_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
except Exception as e:
logger.exception("An unexpected error has occurred!")
```
And the `generate_callback` function would do something like this:
```
def generate_callback(publisher):
def on_message(channel, method_frame, header_frame, body):
logger.debug(f"Received {body}")
# assume some processing is done here, it should be really fast (under one second)
publisher.basic_publish(exchange='', "test", body="random_string", properties=BasicProperties(content_type='text/plain', delivery_mode=DeliveryMode.Persistent))
return on_message
```
Publishing works, but if I do not receive a message in my consumer queue for a couple of minutes, it seems that the publisher connection is lost:
```
ERROR - Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
```
I do not understand what I have to do in order to prevent the connection from being lost. In my current implementation I am automatically recreating the connection, but I would want to prevent this, at least in the cases in which nothing is received for a couple of minutes. What am I missing?