Hi everybody,
I'm quite a novice of kombu and of python in general, so please forgive me if I state (or miss!) obvious things in this post.
I'm trying to realize a simple filter that consumes messages from a queue, processes them and then publishes the results to another queue.
At the moment my code is as follows:
from kombu import Connection, Exchange, Queue, Producer, Consumer
from uem import MessageFormatter
exchange = Exchange('s2a_json_ex', type='direct')
queue = Queue('s2a_queue')
url = 'amqp://
user:pass...@192.168.8.137:5672//'
def ensure_conn_eb(exc, interval):
print 'Connection failed. Reconnecting in %ss' % interval
def message_received(msg, raw):
uem_msg = MessageFormatter.get_final_msg(msg)
print 'before publish'
producer.publish(uem_msg)
print 'after publish'
raw.ack()
while True:
try:
with Connection(url) as conn:
conn.ensure_connection(errback=ensure_conn_eb)
if conn.connected:
print 'Connected.'
with conn.Producer(exchange=exchange, auto_declare=False, routing_key='s2a_json_rk',
serializer='json') as producer:
with conn.Consumer(queue, auto_declare=False, accept=['pickle'],
callbacks=[message_received]) as consumer:
while True:
conn.drain_events()
except conn.recoverable_channel_errors + conn.recoverable_connection_errors as e:
continue
Everything works as expected, but I have problems with disconnections, as follows:
1. RabbitMQ service manual restart: reconnection ok
2. rabbitmqctl stop_app followed by rabbitmqctl start_app: reconnection fails, program last print is 'before publish'
3. force disconnection in RabbitMQ management plugin: same as 2
It seems that there's no exception raised in publish in situations 2 and 3. One further proof is that in situations 2 and 3 I can still see the connection as ESTABLISHED with netstat on the broker machine (even if in 3 the connections isn't showed anymore on the web plugin).
Is there some way to have exceptions raised in these situations? And more in general, am I missing something in the code above? I've explicitly avoided conn.ensure() to have more control over exceptions (for debugging purposes) and to be able to use a single connection (even over restarts) both for publishing and consuming.
Any advice is welcome.
Thanks!!