I am testing RabbitMQ and am trying to use Kombu with it. When feeding large amount of messages from multiple process, it seems the connection gets behind and drops messages.
I took one of the example code and modified it a bit to send multiple messages (see below). I noticed that if run ten of those, each sending 10000 messages, like this:
for i in $(seq 10) ; do python kombu_pub.py 10000 & done
Only about 99000 make it. It varies. But if I put a time.sleep(5) at the end of the program, all messages make it.
Calling connection.close() or connection.release() doesn't change this behavior.
Is there a way to ensure all communications are done before closing the program?
Thanks
-Mathieu
import sys,time                                                                        
from kombu.pools import producers                                                      
from kombu import Exchange, Queue                                                      
import kombu.common                                                                    
                                                                                       
task_exchange = Exchange('tasks', type='direct')                                       
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),                     
               Queue('midpri', task_exchange, routing_key='midpri'),                   
               Queue('lopri', task_exchange, routing_key='lopri')]                     
                                                                                       
priority_to_routing_key = {'high': 'hipri',                                            
                           'mid': 'midpri',                                            
                           'low': 'lopri'}                                             
                                                                                       
                                                                                       
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):                 
    payload = {'fun': fun, 'args': args, 'kwargs': kwargs}                             
    routing_key = priority_to_routing_key[priority]                                    
                                                                                       
    with producers[connection].acquire(block=True) as producer:                        
        producer.publish(payload,                                                      
                         serializer='pickle',                                          
                         compression='bzip2',                                          
                         exchange=task_exchange,                                       
                         declare=[task_exchange],                                      
                         routing_key=routing_key)                                      
                                                                                       
if __name__ == '__main__':                                                             
    from kombu import Connection                                                       
                                                                                       
    counter = 0                                                                        
    connection = Connection('amqp://')                                                 
    for item in [task_exchange,]+task_queues:                                          
        kombu.common.maybe_declare(item,connection)                                    
    for i in range(int(sys.argv[1])):                                                  
        send_as_task(connection, fun=send_as_task, args=('Kombu', i), kwargs={},       
                     priority='high')                                                  
        counter += 1                                                                   
        print 'msg sent: %7d\r' % counter,                                             
    print 'msg sent: %7d' % counter
    # time.sleep(5)