Understanding failover features

1,697 views
Skip to first unread message

Blair Zajac

unread,
Jan 29, 2013, 11:22:17 PM1/29/13
to carrot...@googlegroups.com
Hello,

I'm starting a project that's using RabbitMQ's HA queues and wanted to
use Kombu to set up connections to 3 servers with {"ha-mode":"all"}.

I'm using Ubuntu 12.10 as a client using virtualenv kombu 2.5.4 and its
amqp 1.0.6 dependency talking to three RabbitMQ 3.0.1 servers on Ubuntu
Raring.

Any tests I've done with this little script have failed:

#!/usr/bin/env python

from kombu import Connection
import datetime

url = 'pyamqp://guest:guestc@%s:5672//'
ips = ['192.168.2.%s' % x for x in range(2, 5)]
urls = [url % ip for ip in ips]

with Connection(urls) as conn:
simple_queue = conn.SimpleQueue('ha-two-simple-queue')
message = 'helloword, sent at %s' % datetime.datetime.today()
simple_queue.put(message)
print('Sent: %s' % message)
simple_queue.close()



Test's I've done include:

1) 'rabbitmqctl stop_app' on 192.168.2.2. The script immediately fails
to connect. Adding a conn.ensure_connection() before creating the
SimpleQueue works. Is this intended? Partial stack trace:

File
"/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line
200, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File
"/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/connection.py",
line 745, in default_channel
self.connection
File
"/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/connection.py",
line 730, in connection
self._connection = self._establish_connection()
File
"/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/connection.py",
line 689, in _establish_connection
conn = self.transport.establish_connection()


It looks like Connection#connect should honor the list of URLs or the
pydoc should be updated to state that it only connects to one host.


2) Leaving 192.168.2.2 on and then using 'rabbitmqctl stop_app' the
script immediately fails and doesn't reconnect.

I haven't tried other scenarios, but given my initial testing, I was
thinking/hoping that these would be supported without additional
wrapping of kombu.

Is there a page describing what is and what is not currently supported
for automatic retries?

I would submit some patches, but looking at the code, I'm not sure how
to best address these that fits in the design of the module.

Thanks,
Blair

Ask Solem

unread,
Jan 30, 2013, 7:35:06 AM1/30/13
to carrot...@googlegroups.com
Hey Blair,

There's no "automatic failover", instead there are primitives you can use to automatically handle failure.

The failover part is very recent, and is currently being tested in production to map out what the best
practices are.

Sadly I haven't had much time to write docs for Kombu but this is something I'm going to do soon,
including documented the best practices for failover.

Use ensure_connection to establish a connection, and ensure/autoretry for operations.

Some operations also support a retry argument, so you don't have to wrap ensure around it manually.

Producers are a bit special since you often have to re-declare the queues when the connection comes back up,
so the best practice for publishing a message is to use publish(retry=True, declare=[dest_queue]) (or dest_exchange if the queue is only declared on the receiving end).

Producer example:

connection = Connection()
connection.ensure_connection()

p = Producer(connection)

my_exchange = Exchange('ex', type='direct')
my_queue = Queue('q', ex, routing_key='key')

p.publish(body, exchange=my_exchange, routing_key='key',
retry=True, declare=[my_queue])



Also note that most of these support a 'retry_policy' argument which sets the
maximum retry time, number of attempts and so on.


--
Ask Solem
twitter.com/asksol

Blair Zajac

unread,
Feb 3, 2013, 1:36:30 AM2/3/13
to carrot...@googlegroups.com
Hey Ask,

Thanks for the response, I finally got some time to test this.  The publishing works in face of `rabbitmqctl stop_app` but I didn't find any working retry for consuming events.  This attempt has the client switch over (as shown by lsof) but it doesn't get any new events:


#!/usr/bin/env python

import os
import kombu
import kombu.common

url = 'pyamqp://linted:PJ52HGBw7J7uR8t7c@%s:5672//'
ips = ['192.168.2.%s' % x for x in range(2, 5)]
urls = sorted([url % ip for ip in ips], reverse=False)

exchange = kombu.Exchange(name='blair-test', type='direct', durable=True)
queue = kombu.Queue('ha-all.simple-queue', exchange=exchange,
                    routing_key='key')

def cb(body, message):
    print("Received: %s" % message.payload)
    message.ack()

def errback(exc, interval):
    print("Couldn't publish message: %r. Retry in %ds" % (exc, interval))

with kombu.Connection(urls) as conn:
    conn.ensure_connection()
    with conn.Consumer(queue, callbacks=[cb]) as consumer:
        drain_events = conn.ensure(consumer, conn.drain_events,
                                   errback=errback)
        while True:
            drain_events()


It gets here...

Received: helloword, sent at 2013-02-02 21:57:34.527050
Received: helloword, sent at 2013-02-02 21:57:35.528685
Received: helloword, sent at 2013-02-02 21:57:36.530302
Received: helloword, sent at 2013-02-02 21:57:37.531760
Received: helloword, sent at 2013-02-02 21:57:38.533401
Couldn't publish message: error(104, 'Connection reset by peer'). Retry in 0s
Couldn't publish message: error(111, 'Connection refused'). Retry in 0s

and hangs.  lsof shows it switched to the 2nd IP address and strace shows it blocking waiting on a read:

$ strace -p 9555
Process 9555 attached - interrupt to quit
recvfrom(6, 

$ lsof -n -p 9555 | grep '6u'
python  9555 blair    6u  IPv4 6558858      0t0      TCP 192.168.2.1:60966->192.168.2.3:amqp (ESTABLISHED)


If I try the following

with kombu.Connection(urls) as conn:
    conn.ensure_connection()
    with conn.Consumer(queue, callbacks=[cb]) as consumer:
        for _ in kombu.common.eventloop(conn, timeout=1, ignore_timeouts=True):
            pass

then I get the following with a `rabbitmqctl stop_app`:

Traceback (most recent call last):
  File "./hello_consumer.py", line 36, in <module>
    pass
  File "/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line 379, in __exit__
    self.cancel()
  File "/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line 409, in cancel
    cancel(tag)
  File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/channel.py", line 1610, in basic_cancel
    self._send_method((60, 30), args)
  File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 58, in _send_method
    self.channel_id, method_sig, args, content)
  File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 218, in write_method
    write_frame(1, channel, payload)
  File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/transport.py", line 149, in write_frame
    frame_type, channel, size, payload, 0xce))
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
socket.error: [Errno 32] Broken pipe


Suggestions welcome.

Thanks,
Blair

Ask Solem

unread,
Feb 4, 2013, 7:48:41 AM2/4/13
to carrot...@googlegroups.com

On Feb 3, 2013, at 6:36 AM, Blair Zajac <bl...@orcaware.com> wrote:

>
>


I'm not able to reproduce here using a single connection (or two different URLs pointing to the
same broker: urls = ['pyamqp://', 'pyamqp://localhost']


amqplib, the library that py-amqp is a fork of, never had detailed exceptions,
so ensure had no way of knowing the difference between a recoverable and an irrecoverable
error class. For that reason ensure will try to detect an irrecoverable operation by seeing
if the error persists even though we already re-established the connection.

drain_events is special here in that it may never return until there is a message or an error,
so the detection does not work well, and is likely to lead to "false-positives'.

I'm working on fixing this problem by introducing a new exception hierarchy in py-amqp 2.0:
https://github.com/celery/py-amqp/blob/master/amqp/exceptions.py#L21-L31,
and this also adds kombu.Connection.recoverable_connection_errors

The plan is to have this ready for kombu 3.0, which needs to be ready before Celery 3.1
(in a few months is the current ETA).

In the mean time you should not use .ensure with drain_events/eventloop,
instead catching the exceptions yourself:

def consume_forever(conn):

while 1:
try:
print('Connecting to {0}'.format(conn.as_uri()))
conn.ensure_connection()
with conn.Consumer(queue, callbacks=[cb]) as consumer:
for _ in eventloop(conn, timeout=1, ignore_timeouts=True)
pass
except conn.connection_errors + conn.channel_errors as exc:
print('Connection lost: {0!r}'.format(exc)


>
> with kombu.Connection(urls) as conn:
> conn.ensure_connection()
> with conn.Consumer(queue, callbacks=[cb]) as consumer:
> for _ in kombu.common.eventloop(conn, timeout=1, ignore_timeouts=True):
> pass


eventloop doesn't handle connection errors.
On some systems the connection would mysteriously hang, and this was fixed
by adding a timeout to the socket recv call, so that is pretty much all this provides
over calling drain_events, but it was tiresome to write that code every time.


>
> then I get the following with a `rabbitmqctl stop_app`:
>
> Traceback (most recent call last):
> File "./hello_consumer.py", line 36, in <module>
> pass
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line 379, in __exit__
> self.cancel()
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/kombu/messaging.py", line 409, in cancel
> cancel(tag)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/channel.py", line 1610, in basic_cancel
> self._send_method((60, 30), args)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 58, in _send_method
> self.channel_id, method_sig, args, content)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 218, in write_method
> write_frame(1, channel, payload)
> File "/tmp/pyvenv/local/lib/python2.7/site-packages/amqp/transport.py", line 149, in write_frame
> frame_type, channel, size, payload, 0xce))
> File "/usr/lib/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> socket.error: [Errno 32] Broken pipe
>
>
> Suggestions welcome.
>
> Thanks,
> Blair
>
> --
> You received this message because you are subscribed to the Google Groups "carrot-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to carrot-users...@googlegroups.com.
> To post to this group, send email to carrot...@googlegroups.com.
> Visit this group at http://groups.google.com/group/carrot-users?hl=en.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>



--
Ask Solem
twitter.com/asksol

Reply all
Reply to author
Forward
0 new messages