RMQ Asynchronous consumer: Bad file descriptor error.

853 views
Skip to first unread message

Aswin Roy

unread,
Jul 6, 2017, 2:20:12 AM7/6/17
to rabbitmq-users

Hi,

I have an asynchronous consumer implemented exactly like this one. In the main method, I have a multiprocessing Pool initialised along with a Queue (multiprocessing) which is passed on to it. Then, I pass on the Queue as a parameter to the init method of ExampleConsumer. Now, when a message is consumed by the consumer, inside the on_message method, I put the body of the message as a String into the Queue. This helps my time-consuming tasks run independently of the main process (which is the consumer).

def main():
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    my_queue = multiprocessing.Queue()
    my_pool = multiprocessing.Pool(2, my_class().my_method, (my_queue,))
    example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F', my_queue)
    try:
        example.run()
        my_pool.close()
        my_pool.join()
    except KeyboardInterrupt:
        my_pool.terminate()
        example.stop()

And, inside init method,

def __init__(self, amqp_url, queue):
        """Create a new instance of the consumer class, passing in the AMQP
        URL used to connect to RabbitMQ.

        :param str amqp_url: The AMQP url to connect with

        """
        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
        self._url = amqp_url
        self.queue = queue

then, inside on_message,

 def on_message(self, unused_channel, basic_deliver, properties, body):
        """Invoked by pika when a message is delivered from RabbitMQ. The
        channel is passed for your convenience. The basic_deliver object that
        is passed in carries the exchange, routing key, delivery tag and
        a redelivered flag for the message. The properties passed in is an
        instance of BasicProperties with the message properties and the body
        is the message that was sent.

        :param pika.channel.Channel unused_channel: The channel object
        :param pika.Spec.Basic.Deliver: basic_deliver method
        :param pika.Spec.BasicProperties: properties
        :param str|unicode body: The message body

        """
        LOGGER.info('Received message # %s from %s: %s',
                    basic_deliver.delivery_tag, properties.app_id, body)
        self.acknowledge_message(basic_deliver.delivery_tag)
        self.queue.put(str(body))

The program runs without any issues initially but then crashes with the following error after a while :

File "consumer.py", line 552, in <module>
    main()
  File "consumer.py", line 541, in main
    rmq_consumer.run()
  File "consumer.py", line 500, in run
    self._connection.ioloop.start()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 355, in start
    self.process_timeouts()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 283, in process_timeouts
    timer['callback']()
  File "consumer.py", line 290, in reconnect
    self._connection.ioloop.start()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 354, in start
    self.poll()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 602, in poll
    self._process_fd_events(fd_event_map, write_only)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 443, in _process_fd_events
    handler(fileno, events, write_only=write_only)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 364, in _handle_events
    self._handle_read()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 415, in _handle_read
    self._on_data_available(data)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1347, in _on_data_available
    self._process_frame(frame_value)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1427, in _process_frame
    self._deliver_frame_to_channel(frame_value)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1028, in _deliver_frame_to_channel
    return self._channels[value.channel_number]._handle_content_frame(value)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 896, in _handle_content_frame
    self._on_deliver(*response)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 983, in _on_deliver
    header_frame.properties, body)
  File "consumer.py", line 452, in on_message
    self.acknowledge_message(basic_deliver.delivery_tag)
  File "consumer.py", line 463, in acknowledge_message
    self._channel.basic_ack(delivery_tag)
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 159, in basic_ack
    return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
  File "/usr/local/lib/python2.7/site-packages/pika/channel.py", line 1150, in _send_method
    self.connection._send_method(self.channel_number, method_frame, content)
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1569, in _send_method
    self._send_frame(frame.Method(channel_number, method_frame))
  File "/usr/local/lib/python2.7/site-packages/pika/connection.py", line 1554, in _send_frame
    self._flush_outbound()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 282, in _flush_outbound
    self._handle_write()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 452, in _handle_write
    return self._handle_error(error)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 338, in _handle_error
    self._handle_disconnect()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/base_connection.py", line 288, in _handle_disconnect
    self._adapter_disconnect()
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 94, in _adapter_disconnect
    self.ioloop.remove_handler(self.socket.fileno())
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 579, in remove_handler
    super(PollPoller, self).remove_handler(fileno)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 328, in remove_handler
    self.update_handler(fileno, 0)
  File "/usr/local/lib/python2.7/site-packages/pika/adapters/select_connection.py", line 571, in update_handler
    self._poll.modify(fileno, events)
IOError: [Errno 9] Bad file descriptor

The rabbitmq UI says that there is enough file descriptors available. I understand that pika is not thread-safe. But I am not passing the rmq connections across threads/ processes. The only suspicious thing I do is pass on the shared Queue to the consumer class. Could this be a problem?

I am unable to get any leads on why this might be happening. Any help is appreciated! If any code sample is required, please let me know.

Thanks.

Michael Klishin

unread,
Jul 6, 2017, 5:17:02 AM7/6/17
to rabbitm...@googlegroups.com
"IOError: [Errno 9] Bad file descriptor" happens in your program after it's been
running for some time, so unless it continuously opens new RabbitMQ connections,
server file handle limit  is unlikely to be relevant.

We cannot comment on whether Pika is "thread safe" without knowing what exactly
your code does. Most clients require explicit synchronisation when a channel is shared
for most operations but I'd not get too attached to this theory.

See server logs and if you have a way to reproduce, take a traffic capture
(https://www.rabbitmq.com/amqp-wireshark.html). That will provide a lot more information
to work with.

I wouldn't be surprised if Python throws this when any operation is attempted on a previously
closed socket — server logs and traffic capture should make it pretty obvious whether the
server might have closed a connection (e.g. due to unrecoverable protocol exception,
which can happen due to a concurrency hazard).


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Aswin Roy

unread,
Jul 10, 2017, 8:32:48 AM7/10/17
to rabbitmq-users
Hi,

The simple rmq server logs doesn't seem to say much except that the connection got closed [is there any way we can analyse the reason for this from the server side?]. Taking a capture doesn't seem feasible as I have no idea to reproduce the error. 

Do you think maybe I should try using a different library (a thread-safe one) like rabbitpy and test? I am out of other ideas to debug. 

Cheers

Michael Klishin

unread,
Jul 10, 2017, 11:05:46 AM7/10/17
to rabbitm...@googlegroups.com
It would help immensely if instead of paraphrasing what the log said you could paste it
verbatim (feel free to edit out IP addresses).

There is not enough information to suggest whether this is a library issue or something else
(such as your own code).

Wireshark is by far the best source of information around connection activity:

rabbit.py builds on top of Pika as they are from the same author.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Aswin Roy

unread,
Jul 14, 2017, 10:29:35 AM7/14/17
to rabbitmq-users
Sorry about the delay in the reply!

I am attaching below lines from the rmq server logs.

=INFO REPORT==== 14-Jul-2017::13:21:51 ===
accepting AMQP connection <0.30381.6989> (<ip>:40986 -> <ip>:5672)

=ERROR REPORT==== 14-Jul-2017::13:24:51 ===
closing AMQP connection <0.30381.6989> (<ip>:40986 -><ip>:5672):
missed heartbeats from client, timeout: 60s

=WARNING REPORT==== 14-Jul-2017::13:26:10 ===
closing AMQP connection <0.2740.6989> (<ip>:40004 -> <ip>:5672):
client unexpectedly closed TCP connection

Also:

=WARNING REPORT==== 10-Jul-2017::09:24:01 ===
closing AMQP connection <0.31458.6899> (v:41764 -> <ip>:5672):
client unexpectedly closed TCP connection

=ERROR REPORT==== 10-Jul-2017::09:27:34 ===
Error on AMQP connection <0.30419.6899> (<ip>:41618 -> <ip>:5672, vhost: '/', user: '*****', state: running), channel 1:
operation basic.publish caused a connection exception unexpected_frame: "expected content header for class 60, got non content header frame instead"

=ERROR REPORT==== 10-Jul-2017::09:46:29 ===
closing AMQP connection <0.8025.6900> (<ip>:45110 -> <ip>:5672):
missed heartbeats from client, timeout: 60s

Thank you!

On Thursday, July 6, 2017 at 11:50:12 AM UTC+5:30, Aswin Roy wrote:

Michael Klishin

unread,
Jul 14, 2017, 10:35:04 AM7/14/17
to rabbitm...@googlegroups.com
On Fri, Jul 14, 2017 at 5:29 PM, Aswin Roy <aswinj...@gmail.com> wrote:
Sorry about the delay in the reply!

I am attaching below lines from the rmq server logs.

=INFO REPORT==== 14-Jul-2017::13:21:51 ===
accepting AMQP connection <0.30381.6989> (<ip>:40986 -> <ip>:5672)


A client connection was successfully accepted.
 
=ERROR REPORT==== 14-Jul-2017::13:24:51 ===
closing AMQP connection <0.30381.6989> (<ip>:40986 -><ip>:5672):
missed heartbeats from client, timeout: 60s


and then considered to be dead after a timeout (see http://www.rabbitmq.com/heartbeats.html)
 
=WARNING REPORT==== 14-Jul-2017::13:26:10 ===
closing AMQP connection <0.2740.6989> (<ip>:40004 -> <ip>:5672):
client unexpectedly closed TCP connection


Here a client closed TCP connection without closing AMQP 0-9-1 connection
(we do not know why: could be a small issue in the code where a call to Connection#close
is missing or the client failed, was killed by the OOM killer mechanism, or a proxy decided
that the TCP connection is inactive and should be closed)
 
Also:

=WARNING REPORT==== 10-Jul-2017::09:24:01 ===
closing AMQP connection <0.31458.6899> (v:41764 -> <ip>:5672):
client unexpectedly closed TCP connection

=ERROR REPORT==== 10-Jul-2017::09:27:34 ===
Error on AMQP connection <0.30419.6899> (<ip>:41618 -> <ip>:5672, vhost: '/', user: '*****', state: running), channel 1:
operation basic.publish caused a connection exception unexpected_frame: "expected content header for class 60, got non content header frame instead"


Framing errors such as this stem from channels being shared between threads,
which leads to incorrect frame interleaving on the wire. This has been discussed many
times before on this list.
 
=ERROR REPORT==== 10-Jul-2017::09:46:29 ===
closing AMQP connection <0.8025.6900> (<ip>:45110 -> <ip>:5672):
missed heartbeats from client, timeout: 60s


You have clients losing connections or "going away" silently with a moderately high
timeout. This is not necessarily an indication of an issue (in some systems
this is entirely routine) but it could be.
 

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Aswin Roy

unread,
Jul 14, 2017, 10:50:24 AM7/14/17
to rabbitm...@googlegroups.com
Hi,

Ignore the last two logs (from 10th July). I have changed my code to have separate consumers in the different processes I have. 

Regarding the first set of logs from 14th July, does it have anything to do with a time-intensive task blocking the acknowledgment sent back to the server? Do you think this can be fixed if I initialize heartbeart_interval to 0 while creating the SelectConnection? 


To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/36x7DAkL4a8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-users+unsubscribe@googlegroups.com.

To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Klishin

unread,
Jul 14, 2017, 10:54:56 AM7/14/17
to rabbitm...@googlegroups.com
Heartbeat timeouts are pretty thoroughly documented:

Since any traffic on a connection is considered to be a valid heartbeat, a blocked
client that also cannot send out heartbeat frames (e.g. because it's single threaded and blocked
its I/O loop) will eventually be considered unavailable with exactly this error.




--

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Reply all
Reply to author
Forward
0 new messages