I am testing RabbitMQ and am trying to use Kombu with it. When feeding large amount of messages from multiple process, it seems the connection gets behind and drops messages.
I took one of the example code and modified it a bit to send multiple messages (see below). I noticed that if run ten of those, each sending 10000 messages, like this:
for i in $(seq 10) ; do python kombu_pub.py 10000 & done
Only about 99000 make it. It varies. But if I put a time.sleep(5) at the end of the program, all messages make it.
Calling connection.close() or connection.release() doesn't change this behavior.
Is there a way to ensure all communications are done before closing the program?
Thanks
-Mathieu
import sys,time
from kombu.pools import producers
from kombu import Exchange, Queue
import kombu.common
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
Queue('midpri', task_exchange, routing_key='midpri'),
Queue('lopri', task_exchange, routing_key='lopri')]
priority_to_routing_key = {'high': 'hipri',
'mid': 'midpri',
'low': 'lopri'}
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
producer.publish(payload,
serializer='pickle',
compression='bzip2',
exchange=task_exchange,
declare=[task_exchange],
routing_key=routing_key)
if __name__ == '__main__':
from kombu import Connection
counter = 0
connection = Connection('amqp://')
for item in [task_exchange,]+task_queues:
kombu.common.maybe_declare(item,connection)
for i in range(int(sys.argv[1])):
send_as_task(connection, fun=send_as_task, args=('Kombu', i), kwargs={},
priority='high')
counter += 1
print 'msg sent: %7d\r' % counter,
print 'msg sent: %7d' % counter
# time.sleep(5)