Trying to understand why kombu queue is empty ... when having multiple connections

176 views
Skip to first unread message

Rich Rauenzahn

unread,
Mar 11, 2014, 6:01:26 PM3/11/14
to carrot...@googlegroups.com

I'm using a kombu SimpleQueue to make a message queue (distinct from the celery task queue) -- messages come in over http, and I have celery tasks that are polling the queue.  I'm using RabbitMQ as the backend.  It all seems to work fine in production.

But I'm adding unit testing, and in the testing I'm using EAGER with the celery tasks, and when doing this, one task sees the first message, but the next task doesn't see any messages -- but I can see it in rabbit's queue looking with rabbit's admin command.

I think I know why -- more or less.

Since I am using EAGER in testing, I actually have a call stack history -- which means previous instances of the SimpleQueue might still be around.  In production, tasks calling other tasks don't have a call history/stack with live objects still in them.  They go away.  In testing, I do.

Here is how I make my message queue:


class MessageQueue:

   def __init__(self, ...):
        self.qname = "MessageQueue(%s)" % qname
        self.no_ack = no_ack  # defaults to TRUE
        self.ttl = ttl
        self.connection = kombu.Connection(settings.AMQP_URL)
        self.q = self.connection.SimpleQueue(
                    self.qname,
                    no_ack=no_ack,
                    queue_opts={'queue_arguments': {'x-expires':
                                                    int(self.ttl * 1000)}},
                    serializer='pickle')

My class's receive() is more or less a wrapped self.q.get(block=True, timeout=timeout)

My empty problem went away when I call my close method on the queue before calling the next task 'eagerly':

    def close(self):
        if self.q:
            self.q.close()
        if self.connection:
            self.connection.release()

So, it seems if I have an existing connection, a new connection (from the same process, btw) can't read from the queue.  I checked qos/prefetch, but it says it is ignored when no_ack is set.

So what fundamental thing am I misunderstanding?   Why must I explicitly close existing connections, and is there a way to handle this automatically?  (I guess I could call self.close() within receive()..)

Should I be trying to make the connection a single instance across all instances of this class?  And maybe even a global cache of existing SimpleQueues (per qname)? 

Rich


Ask Solem

unread,
Mar 28, 2014, 2:58:21 PM3/28/14
to carrot...@googlegroups.com
I’m not sure why but I could guess that the connections are not properly closed somewhere (or the consumers
are not properly cancelled).  You could maybe get around some of these problems by using channel.qos(prefetch_count=1) and ack one and one message at a time to make sure no messages
are stuck in the tcp buffers.

Using basic.consume to poll messages from a Celery task is pretty unusual though, and I would not recommend
it.  A better solution would be to add a new consumer component to the worker (using ConsumerStep):



Reply all
Reply to author
Forward
0 new messages