On Feb 3, 2013, at 6:36 AM, Blair Zajac <
bl...@orcaware.com> wrote:
>
>
I'm not able to reproduce here using a single connection (or two different URLs pointing to the
same broker: urls = ['pyamqp://', 'pyamqp://localhost']
amqplib, the library that py-amqp is a fork of, never had detailed exceptions,
so ensure had no way of knowing the difference between a recoverable and an irrecoverable
error class. For that reason ensure will try to detect an irrecoverable operation by seeing
if the error persists even though we already re-established the connection.
drain_events is special here in that it may never return until there is a message or an error,
so the detection does not work well, and is likely to lead to "false-positives'.
I'm working on fixing this problem by introducing a new exception hierarchy in py-amqp 2.0:
https://github.com/celery/py-amqp/blob/master/amqp/exceptions.py#L21-L31,
and this also adds kombu.Connection.recoverable_connection_errors
The plan is to have this ready for kombu 3.0, which needs to be ready before Celery 3.1
(in a few months is the current ETA).
In the mean time you should not use .ensure with drain_events/eventloop,
instead catching the exceptions yourself:
def consume_forever(conn):
while 1:
try:
print('Connecting to {0}'.format(conn.as_uri()))
conn.ensure_connection()
with conn.Consumer(queue, callbacks=[cb]) as consumer:
for _ in eventloop(conn, timeout=1, ignore_timeouts=True)
pass
except conn.connection_errors + conn.channel_errors as exc:
print('Connection lost: {0!r}'.format(exc)
>
> with kombu.Connection(urls) as conn:
> conn.ensure_connection()
> with conn.Consumer(queue, callbacks=[cb]) as consumer:
> for _ in kombu.common.eventloop(conn, timeout=1, ignore_timeouts=True):
> pass
eventloop doesn't handle connection errors.
On some systems the connection would mysteriously hang, and this was fixed
by adding a timeout to the socket recv call, so that is pretty much all this provides
over calling drain_events, but it was tiresome to write that code every time.
>
> then I get the following with a `rabbitmqctl stop_app`:
>
> Traceback (most recent call last):
> File "./hello_consumer.py", line 36, in <module>
> pass
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line 379, in __exit__
> self.cancel()
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line 409, in cancel
> cancel(tag)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/channel.py", line 1610, in basic_cancel
> self._send_method((60, 30), args)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 58, in _send_method
> self.channel_id, method_sig, args, content)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 218, in write_method
> write_frame(1, channel, payload)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/transport.py", line 149, in write_frame
> frame_type, channel, size, payload, 0xce))
> File "/usr/lib/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> socket.error: [Errno 32] Broken pipe
>
>
> Suggestions welcome.
>
> Thanks,
> Blair
>
> --
> You received this message because you are subscribed to the Google Groups "carrot-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to
carrot-users...@googlegroups.com.
> To post to this group, send email to
carrot...@googlegroups.com.
> Visit this group at
http://groups.google.com/group/carrot-users?hl=en.
> For more options, visit
https://groups.google.com/groups/opt_out.
>
>
--
Ask Solem
twitter.com/asksol