hang in publish on disconnections

25 views
Skip to first unread message

Fabio Sangiovanni

unread,
Jul 10, 2014, 12:24:39 PM7/10/14
to carrot...@googlegroups.com
Hi everybody,

I'm quite a novice of kombu and of python in general, so please forgive me if I state (or miss!) obvious things in this post.
I'm trying to realize a simple filter that consumes messages from a queue, processes them and then publishes the results to another queue.
At the moment my code is as follows:




from kombu import Connection, Exchange, Queue, Producer, Consumer
from uem import MessageFormatter

exchange = Exchange('s2a_json_ex', type='direct')
queue = Queue('s2a_queue')
url = 'amqp://user:pass...@192.168.8.137:5672//'

def ensure_conn_eb(exc, interval):
    print 'Connection failed. Reconnecting in %ss' % interval

def message_received(msg, raw):
    uem_msg = MessageFormatter.get_final_msg(msg)
    print 'before publish'
    producer.publish(uem_msg)
    print 'after publish'
    raw.ack()


while True:
    try:
        with Connection(url) as conn:
            conn.ensure_connection(errback=ensure_conn_eb)
            if conn.connected:
                print 'Connected.'

            with conn.Producer(exchange=exchange, auto_declare=False, routing_key='s2a_json_rk',
                               serializer='json') as producer:

                with conn.Consumer(queue, auto_declare=False, accept=['pickle'],
                                   callbacks=[message_received]) as consumer:

                    while True:
                        conn.drain_events()

    except conn.recoverable_channel_errors + conn.recoverable_connection_errors as e:
        continue


Everything works as expected, but I have problems with disconnections, as follows:
1. RabbitMQ service manual restart: reconnection ok
2. rabbitmqctl stop_app followed by rabbitmqctl start_app: reconnection fails, program last print is 'before publish'
3. force disconnection in RabbitMQ management plugin: same as 2

It seems that there's no exception raised in publish in situations 2 and 3. One further proof is that in situations 2 and 3 I can still see the connection as ESTABLISHED with netstat on the broker machine (even if in 3 the connections isn't showed anymore on the web plugin).

Is there some way to have exceptions raised in these situations? And more in general, am I missing something in the code above? I've explicitly avoided conn.ensure() to have more control over exceptions (for debugging purposes) and to be able to use a single connection (even over restarts) both for publishing and consuming.

Any advice is welcome.

Thanks!!
Reply all
Reply to author
Forward
0 new messages