I am a beginner to the rabbitmq. I am using rabbitmq to communicate between two different services. During high load, I saw the following error after every ~5 mins.
2021-08-31 23:26:41,433 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 40082)>
2021-08-31 23:26:41,433 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
raise self._closed_result.value.error
logging.error(
2021-08-31 23:31:14,431 ERROR pika.adapters.utils.io_services_utils _on_socket_readable: _AsyncBaseTransport._consume() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 42412)>; Caller's stack:
2021-08-31 23:31:14,431 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 42412)>
2021-08-31 23:31:14,432 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
raise self._closed_result.value.error
logging.error(
2021-08-31 23:36:48,927 ERROR pika.adapters.utils.io_services_utils _on_socket_writable: _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 44692)>; Caller's stack:
2021-08-31 23:36:48,927 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 44692)>
2021-08-31 23:36:48,927 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
raise self._closed_result.value.error
logging.error(
2021-08-31 23:42:27,403 ERROR pika.adapters.utils.io_services_utils _on_socket_writable: _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 47474)>; Caller's stack:
2021-08-31 23:42:27,404 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 47474)>
2021-08-31 23:42:27,404 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
raise self._closed_result.value.error
logging.error(
import logging
import os
import threading
import pika
from pika import BasicProperties
class Rabbitmq:
"""
This class contains methods for getting a Rabbit MQ connection.
"""
def __init__(self):
self.connection = None
self.rabbitmq_host = os.environ.get('RABBITMQ_HOST')
# create a single connection to rabbitmq. It is kept alive
# using heartbeat
if self.rabbitmq_host:
self.connection = self.new_connection()
def new_connection(self):
return pika.BlockingConnection(
pika.ConnectionParameters(
host=self.rabbitmq_host, heartbeat=600, blocked_connection_timeout=300
)
)
def is_enabled(self):
return True if self.rabbitmq_host else False
def get_connection(self):
if self.connection:
if self.connection.is_closed:
logging.info('re-initalizing rabbitmq connection')
self.connection = self.new_connection()
return self.connection
else:
raise Exception('connection not created. Please set RABBITMQ_HOST')
class FanoutRabbitConsumer:
def __init__(self, queue, exchange):
self.exchange = exchange
self.queue = queue
self.rabbit_mq = Rabbitmq()
self.host = os.environ['RABBITMQ_HOST']
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host, heartbeat=600, blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue_declare(queue=self.queue, durable=True)
self.channel.queue_bind(exchange=self.exchange, queue=self.queue)
def publish(self, body):
self.channel.basic_publish(exchange=self.exchange, body=body, routing_key='')
class FanoutRabbitPublisher:
def __init__(self, queue, exchange):
self.exchange = exchange
self.queue = queue
self.rabbit_mq = Rabbitmq()
self.host = os.environ['RABBITMQ_HOST']
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host, heartbeat=600, blocked_connection_timeout=300
)
)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
# self.channel.queue_declare(queue=self.queue, durable=True)
# self.channel.queue_bind(exchange=self.exchange, queue=self.queue)
def publish(self, body):
self.channel.basic_publish(exchange=self.exchange, body=body, routing_key='')
class RabbitmqConsumer:
"""
This helper class contains methods for consuming messages off the
rabbitmq service.
"""
def __init__(self, queue, exchange, exchange_type='direct', exclusive=False):
""" Initializes rabbit consumer with given exclusive values. If exclusive=True
the queue will be deleted once the connection closes.
"""
self.rabbit_mq = Rabbitmq()
self.exclusive = exclusive
logging.info(f'Initing {queue}')
self.queue = queue
self.exchange = exchange
self.exchange_type = exchange_type
def start(self):
"""
Start rabbitmq listener
"""
if self.rabbit_mq.is_enabled():
thread = threading.Thread(target=self._start_consuming_rabbit_mq)
thread.setDaemon(True)
thread.start()
else:
logging.info('Rabbit MQ not configured')
def _start_consuming_rabbit_mq(self):
"""
Start rabbit mq threads. This method should be called from a
demon thread. start_consuming() blocks for ever
"""
while True:
try:
channel = self.rabbit_mq.new_connection().channel()
channel.exchange_declare(
exchange=self.exchange, exchange_type=self.exchange_type
)
channel.queue_declare(
queue=self.queue, durable=True, exclusive=self.exclusive
)
channel.queue_bind(exchange=self.exchange, queue=self.queue)
channel.basic_consume(
queue=self.queue,
on_message_callback=self.callback,
auto_ack=True, # TODO check auto_ack
)
logging.info('Starting Rabbit MQ listener')
channel.start_consuming()
except Exception:
logging.error(
'rabbitmq consumer failed, restarting...', 'exc_info=True'
)
def callback(self, channel, method, properties, body):
"""
RabbitMQ calls this method when it gets on a message. Override this.
"""
pass
def close(self):
"""
close connection
"""
self.rabbit_mq.get_connection().close()
class RabbitmqPublisher(Rabbitmq):
"""
This helper class contains methods for publishing messages to the
rabbitmq service.
"""
def __init__(self, config, queue, exchange, exchange_type='direct'):
super(RabbitmqPublisher, self).__init__()
self.config = config
self.rabbit_mq = Rabbitmq()
self.queue = queue
self.exchange = exchange
self.exchange_type = exchange_type
self.channel = None
if self.connection:
self.channel = self.connection.channel()
def connect(self):
"""
wrapper method around the parent's new_connection(). This method also
reinitializes and sets up the channel.
"""
if not self.is_enabled():
raise Exception('connection not created. Please set RABBITMQ_HOST')
if (
not self.connection
or self.connection.is_closed
or not self.channel
or self.channel.is_closed
):
connection = self.new_connection()
channel = connection.channel()
channel.exchange_declare(
exchange=self.exchange, exchange_type=self.exchange_type
)
channel.queue_declare(queue=self.queue, durable=True)
channel.queue_bind(exchange=self.exchange, queue=self.queue)
self.connection = connection
self.channel = channel
def _publish(self, message):
"""
Publish the message.
"""
if self.rabbit_mq.is_enabled():
channel = self.channel
channel.basic_publish(
exchange=self.exchange,
routing_key=self.queue,
body=message,
properties=BasicProperties(delivery_mode=2),
)
else:
logging.info('Rabbit MQ not configured')
def publish(self, message):
"""
Publish the message.
"""
try:
self._publish(message)
except (
pika.exceptions.ConnectionClosed,
pika.exceptions.StreamLostError,
pika.exceptions.ChannelWrongStateError,
pika.exceptions.ChannelError,
):
self.connect() # re-initialize.
self._publish(message)
def close(self):
"""
close connection
"""
self.rabbit_mq.get_connection().close()
1. When connection reset happens, does it lose the message?
2. How can I improve the code to handle large loads?
Please let me know if additional information is required.