Rabbitmq Consumer Connection Reset

877 views
Skip to first unread message

Nihar Patil

unread,
Sep 1, 2021, 10:22:11 AM9/1/21
to rabbitmq-users

Hi,

I am a beginner to the rabbitmq. I am using rabbitmq to communicate between two different services. During high load, I saw the following error after every ~5 mins.


```
2021-08-31 23:26:41,433 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 40082)>
2021-08-31 23:26:41,433 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
    raise self._closed_result.value.error
    logging.error(
2021-08-31 23:31:14,431 ERROR pika.adapters.utils.io_services_utils _on_socket_readable: _AsyncBaseTransport._consume() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 42412)>; Caller's stack:
2021-08-31 23:31:14,431 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 42412)>
2021-08-31 23:31:14,432 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
    raise self._closed_result.value.error
    logging.error(
2021-08-31 23:36:48,927 ERROR pika.adapters.utils.io_services_utils _on_socket_writable: _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 44692)>; Caller's stack:
2021-08-31 23:36:48,927 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 44692)>
2021-08-31 23:36:48,927 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
    raise self._closed_result.value.error
    logging.error(
2021-08-31 23:42:27,403 ERROR pika.adapters.utils.io_services_utils _on_socket_writable: _AsyncBaseTransport._produce() failed, aborting connection: error=ConnectionResetError(104, 'Connection reset by peer'); sock=<socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 47474)>; Caller's stack:
2021-08-31 23:42:27,404 INFO pika.adapters.utils.io_services_utils _initiate_abort: _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=ConnectionResetError(104, 'Connection reset by peer'); <socket.socket fd=73, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.10.124.247', 47474)>
2021-08-31 23:42:27,404 INFO pika.connection _on_stream_terminated: AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')",); pending-error=None
--- Logging error ---
    raise self._closed_result.value.error
    logging.error(
```

Code
```
import logging
import os
import threading

import pika
from pika import BasicProperties


class Rabbitmq:
    """
    This class contains methods for getting a Rabbit MQ connection.
    """

    def __init__(self):
        self.connection = None
        self.rabbitmq_host = os.environ.get('RABBITMQ_HOST')
        # create a single connection to rabbitmq. It is kept alive
        # using heartbeat
        if self.rabbitmq_host:
            self.connection = self.new_connection()

    def new_connection(self):
        return pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.rabbitmq_host, heartbeat=600, blocked_connection_timeout=300
            )
        )

    def is_enabled(self):
        return True if self.rabbitmq_host else False

    def get_connection(self):
        if self.connection:
            if self.connection.is_closed:
                logging.info('re-initalizing rabbitmq connection')
                self.connection = self.new_connection()
            return self.connection
        else:
            raise Exception('connection not created. Please set RABBITMQ_HOST')


class FanoutRabbitConsumer:
    def __init__(self, queue, exchange):
        self.exchange = exchange
        self.queue = queue
        self.rabbit_mq = Rabbitmq()
        self.host = os.environ['RABBITMQ_HOST']
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.host, heartbeat=600, blocked_connection_timeout=300
            )
        )
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
        self.channel.queue_declare(queue=self.queue, durable=True)
        self.channel.queue_bind(exchange=self.exchange, queue=self.queue)

    def publish(self, body):
        self.channel.basic_publish(exchange=self.exchange, body=body, routing_key='')


class FanoutRabbitPublisher:
    def __init__(self, queue, exchange):
        self.exchange = exchange
        self.queue = queue
        self.rabbit_mq = Rabbitmq()
        self.host = os.environ['RABBITMQ_HOST']
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.host, heartbeat=600, blocked_connection_timeout=300
            )
        )
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
        # self.channel.queue_declare(queue=self.queue, durable=True)
        # self.channel.queue_bind(exchange=self.exchange, queue=self.queue)

    def publish(self, body):
        self.channel.basic_publish(exchange=self.exchange, body=body, routing_key='')


class RabbitmqConsumer:
    """
    This helper class contains methods for consuming messages off the
    rabbitmq service.
    """

    def __init__(self, queue, exchange, exchange_type='direct', exclusive=False):
        """ Initializes rabbit consumer with given exclusive values. If exclusive=True
        the queue will be deleted once the connection closes.
        """
        self.rabbit_mq = Rabbitmq()
        self.exclusive = exclusive
        logging.info(f'Initing {queue}')
        self.queue = queue
        self.exchange = exchange
        self.exchange_type = exchange_type

    def start(self):
        """
        Start rabbitmq listener
        """
        if self.rabbit_mq.is_enabled():
            thread = threading.Thread(target=self._start_consuming_rabbit_mq)
            thread.setDaemon(True)
            thread.start()
        else:
            logging.info('Rabbit MQ not configured')

    def _start_consuming_rabbit_mq(self):
        """
        Start rabbit mq threads. This method should be called from a
        demon thread. start_consuming() blocks for ever
        """
        while True:
            try:
                channel = self.rabbit_mq.new_connection().channel()
                channel.exchange_declare(
                    exchange=self.exchange, exchange_type=self.exchange_type
                )
                channel.queue_declare(
                    queue=self.queue, durable=True, exclusive=self.exclusive
                )
                channel.queue_bind(exchange=self.exchange, queue=self.queue)

                channel.basic_consume(
                    queue=self.queue,
                    on_message_callback=self.callback,
                    auto_ack=True,  # TODO check auto_ack
                )
                logging.info('Starting Rabbit MQ listener')
                channel.start_consuming()
            except Exception:
                logging.error(
                    'rabbitmq consumer failed, restarting...', 'exc_info=True'
                )

    def callback(self, channel, method, properties, body):
        """
        RabbitMQ calls this method when it gets on a message. Override this.
        """
        pass

    def close(self):
        """
        close connection
        """
        self.rabbit_mq.get_connection().close()


class RabbitmqPublisher(Rabbitmq):
    """
    This helper class contains methods for publishing messages to the
    rabbitmq service.
    """

    def __init__(self, config, queue, exchange, exchange_type='direct'):
        super(RabbitmqPublisher, self).__init__()
        self.config = config
        self.rabbit_mq = Rabbitmq()
        self.queue = queue
        self.exchange = exchange
        self.exchange_type = exchange_type

        self.channel = None
        if self.connection:
            self.channel = self.connection.channel()

    def connect(self):
        """
        wrapper method around the parent's new_connection(). This method also
        reinitializes and sets up the channel.
        """
        if not self.is_enabled():
            raise Exception('connection not created. Please set RABBITMQ_HOST')

        if (
            not self.connection
            or self.connection.is_closed
            or not self.channel
            or self.channel.is_closed
        ):
            connection = self.new_connection()
            channel = connection.channel()
            channel.exchange_declare(
                exchange=self.exchange, exchange_type=self.exchange_type
            )
            channel.queue_declare(queue=self.queue, durable=True)
            channel.queue_bind(exchange=self.exchange, queue=self.queue)

            self.connection = connection
            self.channel = channel

    def _publish(self, message):
        """
        Publish the message.
        """
        if self.rabbit_mq.is_enabled():
            channel = self.channel
            channel.basic_publish(
                exchange=self.exchange,
                routing_key=self.queue,
                body=message,
                properties=BasicProperties(delivery_mode=2),
            )
        else:
            logging.info('Rabbit MQ not configured')

    def publish(self, message):
        """
        Publish the message.
        """
        try:
            self._publish(message)
        except (
            pika.exceptions.ConnectionClosed,
            pika.exceptions.StreamLostError,
            pika.exceptions.ChannelWrongStateError,
            pika.exceptions.ChannelError,
        ):
            self.connect()  # re-initialize.
            self._publish(message)

    def close(self):
        """
        close connection
        """
        self.rabbit_mq.get_connection().close()

```
I have the following questions:
1. When connection reset happens, does it lose the message?
2. How can I improve the code to handle large loads?

Please let me know if additional information is required.

Thanks,
Nihar
Reply all
Reply to author
Forward
0 new messages