dropped message on busy rabbitmq server

55 views
Skip to first unread message

Mathieu Longtin

unread,
Jan 28, 2014, 4:14:01 PM1/28/14
to carrot...@googlegroups.com
I am testing RabbitMQ and am trying to use Kombu with it. When feeding large amount of messages from multiple process, it seems the connection gets behind and drops messages.

I took one of the example code and modified it a bit to send multiple messages (see below). I noticed that if run ten of those, each sending 10000 messages, like this:

for i in $(seq 10) ; do python kombu_pub.py 10000 & done

Only about 99000 make it. It varies. But if I put a time.sleep(5) at the end of the program, all messages make it.

Calling connection.close() or connection.release() doesn't change this behavior.

Is there a way to ensure all communications are done before closing the program?

Thanks

-Mathieu

import sys,time                                                                       
from kombu.pools import producers                                                     
from kombu import Exchange, Queue                                                     
import kombu.common                                                                   
                                                                                      
task_exchange = Exchange('tasks', type='direct')                                      
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),                    
               Queue('midpri', task_exchange, routing_key='midpri'),                  
               Queue('lopri', task_exchange, routing_key='lopri')]                    
                                                                                      
priority_to_routing_key = {'high': 'hipri',                                           
                           'mid': 'midpri',                                           
                           'low': 'lopri'}                                            
                                                                                      
                                                                                      
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):                
    payload = {'fun': fun, 'args': args, 'kwargs': kwargs}                            
    routing_key = priority_to_routing_key[priority]                                   
                                                                                      
    with producers[connection].acquire(block=True) as producer:                       
        producer.publish(payload,                                                     
                         serializer='pickle',                                         
                         compression='bzip2',                                         
                         exchange=task_exchange,                                      
                         declare=[task_exchange],                                     
                         routing_key=routing_key)                                     
                                                                                      
if __name__ == '__main__':                                                            
    from kombu import Connection                                                      
                                                                                      
    counter = 0                                                                       
    connection = Connection('amqp://')                                                
    for item in [task_exchange,]+task_queues:                                         
        kombu.common.maybe_declare(item,connection)                                   
    for i in range(int(sys.argv[1])):                                                 
        send_as_task(connection, fun=send_as_task, args=('Kombu', i), kwargs={},      
                     priority='high')                                                 
        counter += 1                                                                  
        print 'msg sent: %7d\r' % counter,                                            
    print 'msg sent: %7d' % counter
    # time.sleep(5)                                                
                                                                 

Ask Solem

unread,
Feb 3, 2014, 10:11:31 AM2/3/14
to carrot...@googlegroups.com
On Jan 28, 2014, at 9:14 PM, Mathieu Longtin <mat...@closetwork.org> wrote:

I am testing RabbitMQ and am trying to use Kombu with it. When feeding large amount of messages from multiple process, it seems the connection gets behind and drops messages.

I took one of the example code and modified it a bit to send multiple messages (see below). I noticed that if run ten of those, each sending 10000 messages, like this:

for i in $(seq 10) ; do python kombu_pub.py 10000 & done

Only about 99000 make it. It varies. But if I put a time.sleep(5) at the end of the program, all messages make it.

Calling connection.close() or connection.release() doesn't change this behavior.

Is there a way to ensure all communications are done before closing the program?

Thanks

-Mathieu



There’s no local write cache so maybe you can call os.fsync() before exiting?

os.fsync(connection.connection.sock.fileno())

or librabbitmq:

os.fsync(connection.connection.fileno)


--
You received this message because you are subscribed to the Google Groups "carrot-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to carrot-users...@googlegroups.com.
To post to this group, send email to carrot...@googlegroups.com.
Visit this group at http://groups.google.com/group/carrot-users.
For more options, visit https://groups.google.com/groups/opt_out.

Reply all
Reply to author
Forward
0 new messages