I'm using a kombu SimpleQueue to make a message queue (distinct from the celery task queue) -- messages come in over http, and I have celery tasks that are polling the queue. I'm using RabbitMQ as the backend. It all seems to work fine in production.
But I'm adding unit testing, and in the testing I'm using EAGER with the celery tasks, and when doing this, one task sees the first message, but the next task doesn't see any messages -- but I can see it in rabbit's queue looking with rabbit's admin command.
I think I know why -- more or less.
Since I am using EAGER in testing, I actually have a call stack history -- which means previous instances of the SimpleQueue might still be around. In production, tasks calling other tasks don't have a call history/stack with live objects still in them. They go away. In testing, I do.
Here is how I make my message queue:
class MessageQueue:
def __init__(self, ...):
self.qname = "MessageQueue(%s)" % qname
self.no_ack = no_ack # defaults to TRUE
self.ttl = ttl
self.connection = kombu.Connection(settings.AMQP_URL)
self.q = self.connection.SimpleQueue(
self.qname,
no_ack=no_ack,
queue_opts={'queue_arguments': {'x-expires':
int(self.ttl * 1000)}},
serializer='pickle')
My class's receive() is more or less a wrapped self.q.get(block=True, timeout=timeout)
My empty problem went away when I call my close method on the queue before calling the next task 'eagerly':
def close(self):
if self.q:
self.q.close()
if self.connection:
self.connection.release()
So, it seems if I have an existing connection, a new connection (from the same process, btw) can't read from the queue. I checked qos/prefetch, but it says it is ignored when no_ack is set.
So what fundamental thing am I misunderstanding? Why must I explicitly close existing connections, and is there a way to handle this automatically? (I guess I could call self.close() within receive()..)
Should I be trying to make the connection a single instance across all instances of this class? And maybe even a global cache of existing SimpleQueues (per qname)?
Rich