Work around to error when creating connection with "stop_ioloop_on_close=False"

1,244 views
Skip to first unread message

Michael Penza

unread,
Apr 6, 2013, 1:41:15 PM4/6/13
to pika-...@googlegroups.com

Not sure if I found a problem or I'm doing something wrong, but the "Asynchronous Consumer Example" (https://pika.readthedocs.org/en/latest/examples/asynchronous_consumer_example.html) crashes when the socket is closed.  I was using the sample to learn how to implement an reconnection algorithm.  When the socket is closed (either using TCPView or just throw a passive True into the exchange_declare call), an exception is thrown.  The tail of the log, the stack, and exception are at the bottom of this post.

This appears to occur because of the "stop_ioloop_on_close=False" in the SelectConnection().  However, without that, the on_connection_closed() callback isn't reliably called and the timer based reconnect() callback is never called.

So here was my work around.  First, call SelectConnection with "stop_ioloop_on_close=True".  Then, change the main function to:

def main():
    logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
    example = ExampleConsumer('amqp://guest:guest@localhost:5672/%2F')
    try:
        while True:
            example.run()
            LOGGER.warning('Connection closed, reopening in 5 seconds')
            time.sleep(5)
    except KeyboardInterrupt:
        example.stop()

The sample should now run correctly, and reconnect automatically on disconnect.  You could also remove the reconnection code from the consumer class as its no longer used.  On an aside, I've also started checking the error code in the on_channel_close() callback.  I'm trying to verify the existence of a queue using queue_declare() with passive=True and the only way to detect it doesn't exist is to see that the channel is closed with a 404.  By catching it there, I don't go into a continuous connect/disconnect loop.

ERROR      2013-04-06 13:17:27,424 pika.adapters.base_connection  _handle_error                        271 : Socket Error on fd 520: 10054
WARNING    2013-04-06 13:17:27,444 pika.adapters.base_connection  _check_state_on_disconnect           157 : Socket closed when connection was open
WARNING    2013-04-06 13:17:27,453 pika.connection                _on_disconnect                       1161: Disconnected from RabbitMQ at ec2-50-19-15-10.compute-1.amazonaws.com:5672 (0): Not specified
INFO       2013-04-06 13:17:27,459 pika.channel                   _on_close                            833 : <METHOD(['frame_type=1', 'channel_number=1', "method=<Channel.Close(['reply_code=0', 'reply_text=Not specified'])>"])>
WARNING    2013-04-06 13:17:27,467 pika.channel                   _on_close                            836 : Received remote Channel.Close (0): Not specified
WARNING    2013-04-06 13:17:27,476 __main__                       on_channel_closed                    133 : Channel 1 was closed: (0) Not specified
WARNING    2013-04-06 13:17:27,486 pika.connection                close                                586 : Invoked while closing or closed
WARNING    2013-04-06 13:17:27,500 __main__                       on_connection_closed                 81  : Connection closed, reopening in 5 seconds: (0) Not specified
ERROR      2013-04-06 13:17:27,914 pika.adapters.base_connection  _handle_events                       297 : Error event 8, error(10038, 'An operation was attempted on something that is not a socket')

Traceback (most recent call last):
  File "C:\RPi\Perforce\Field Comm Adapter RabbitMQ to Cassandra\test.py", line 347, in <module>
    main()
  File "C:\RPi\Perforce\Field Comm Adapter RabbitMQ to Cassandra\test.py", line 341, in main
    example.run()
  File "C:\RPi\Perforce\Field Comm Adapter RabbitMQ to Cassandra\test.py", line 317, in run
    self._connection.ioloop.start()
  File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 102, in start
    self.poller.start()
  File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 259, in start
    self.poll()
  File "C:\Python27\lib\site-packages\pika\adapters\select_connection.py", line 222, in poll
    return self._handler(self.fileno, ERROR, error)
  File "C:\Python27\lib\site-packages\pika\adapters\base_connection.py", line 298, in _handle_events
    self._handle_error(error)
  File "C:\Python27\lib\site-packages\pika\adapters\base_connection.py", line 271, in _handle_error
    self.socket.fileno(), error_code)
AttributeError: 'NoneType' object has no attribute 'fileno'

Gavin M. Roy

unread,
Apr 7, 2013, 7:33:10 PM4/7/13
to pika-...@googlegroups.com
Which version were you using? I just noticed a bug in BaseConnection with 0.9.13p[0,1] that might have caused this.


--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Gavin M. Roy

unread,
Apr 7, 2013, 7:58:49 PM4/7/13
to pika-...@googlegroups.com
In my tests, using 0.9.13r1 from Github master addresses the issue you ran into.

I plan on releasing this in the next few days if you want to play around with it and confirm if it addresses your issue.

Michael Penza

unread,
Apr 7, 2013, 8:10:55 PM4/7/13
to pika-...@googlegroups.com
Thanks for taking a look and replying   I'm running version 0.9.12 (It was the default version installed by pip).  I'll try out the new version when it's released.  Not in a rush though; the work around seems to be working fine for me.

rowla...@gmail.com

unread,
May 29, 2013, 8:25:13 AM5/29/13
to pika-...@googlegroups.com
Hi there.

I see the exact same issue if I try and connect to an exchange that doesn't exist. It can be reproduced very easily using the example asynchronous consumer code by changing the setup_exchange function to declare a different named exchange so that when binding takes place the exchange doesn't exist.

I see the 10038 error followed by the Python attribute error in the following code from _handle_error() in base_connection:

        else:
            # Haven't run into this one yet, log it.
            LOGGER.error("Socket Error on fd %d: %s",
                         self.socket.fileno(), error_code)

...in this case self.socket is None.

I'm running pika 0.9.13. Is the work around described the right way to handle, or is this something that could be fixed in pika?

Thanks,
Jon
Reply all
Reply to author
Forward
0 new messages