BlockingConnection and reusing channel

168 views
Skip to first unread message

Mateusz Głowiński

unread,
Aug 16, 2022, 5:12:34 PM8/16/22
to Pika
Hi,
I've 2 questions about using BlockingConnection.

1) I've RabbitMQ configured as described in Topic tutorial.
When I produce two messages immediately on queue, I notice that callback is called once after another immediately, the consumer is not waiting to finish one callback execution for the first message, but invokes a seconds callback call for seconds message produced to queue.
In order to prevent the race condition, is it safe to use `threading.Lock` in the callback method?

2) I want to produce a message to another queue in a callback call. Can I safely use the channel passed in the parameters of the callback function or should I use the add_callback_threadsafe somehow?

Something like this:
```
def general_callback(ch, method, properties: pika.BasicProperties, body):
result = handle_action()

if (result is not None) and (bool(properties.reply_to)):
ch.basic_publish(
exchange="",
routing_key=properties.reply_to,
body=json.dumps(result).encode(),
)
```

Regards,
Mateusz Głowiński

Gavin M. Roy

unread,
Aug 16, 2022, 7:55:01 PM8/16/22
to pika-...@googlegroups.com
On Tue, Aug 16, 2022 at 5:12 PM Mateusz Głowiński <mglowi...@gmail.com> wrote:
Hi,
I've 2 questions about using BlockingConnection.

1) I've RabbitMQ configured as described in Topic tutorial.
When I produce two messages immediately on queue, I notice that callback is called once after another immediately, the consumer is not waiting to finish one callback execution for the first message, but invokes a seconds callback call for seconds message produced to queue.
In order to prevent the race condition, is it safe to use `threading.Lock` in the callback method?


Per my other email, use basic qos with a prefetch of 1 to tell Rabbit to send you only want one message at a time.



2) I want to produce a message to another queue in a callback call. Can I safely use the channel passed in the parameters of the callback function or should I use the add_callback_threadsafe somehow?


Yes


Something like this:
```
def general_callback(ch, method, properties: pika.BasicProperties, body):
result = handle_action()

if (result is not None) and (bool(properties.reply_to)):
ch.basic_publish(
exchange="",
routing_key=properties.reply_to,
body=json.dumps(result).encode(),
)
```

Regards,
Mateusz Głowiński

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pika-python/06b39c55-a701-4dcf-ac6a-990d24d1c6den%40googlegroups.com.

Mateusz Głowiński

unread,
Aug 17, 2022, 2:03:42 AM8/17/22
to Pika
Thanks you for an answer!

Mateusz Głowiński

unread,
Aug 20, 2022, 1:53:53 PM8/20/22
to Pika
@Gavin,
One more question regarding point 1.
I noticed that `time.sleep` is blocking invoking callback function simultaneously.
Why is that?

Mateusz Głowiński

unread,
Aug 21, 2022, 5:29:01 AM8/21/22
to Pika
Seems like i was wrong.
BlockingConnections calls `callback` function one by one and waits until first one ends.

I tried something like this:

channel.basic_consume(
     queue=rockwell_queue.method.queue,
     on_message_callback=threaded_general_callback,
     auto_ack=True,
)

LOCKS = defaultdict(Lock)


def general_callback(ch, method, properties, body):
     parameters = json.loads(body)
     id = int(parameters["id"])
     timeout = int(parameters.get("timeout", 3600))
     lock = LOCKS[plc_id]

     locked = lock.acquire(timeout=timeout)
     try:
          if locked:

               handle_action()

               if (result is not None) and (bool(properties.reply_to)):
                   ch.connection.add_callback_threadsafe(
                       functools.partial(
                           ch.basic_publish,
                           exchange="",
                           routing_key=properties.reply_to,
                           body=json.dumps(result).encode(),
                          )
                     )
     finally:
         if locked:
             lock.release()


def threaded_general_callback(ch, method, properties, body):
     Thread(target=general_callback, args=(ch, method, properties, body)).start()



Do you think that upper logic with threads make sense or it's better to use `SelectedConnection`?

Gavin M. Roy

unread,
Aug 22, 2022, 2:02:35 PM8/22/22
to pika-...@googlegroups.com
Don't use time.sleep, it blocks the Python interpreter from running, use BlockingConnection.sleep see https://pika.readthedocs.io/en/stable/modules/adapters/blocking.html#pika.adapters.blocking_connection.BlockingConnection.sleep

Gavin M. Roy

unread,
Aug 22, 2022, 2:03:20 PM8/22/22
to pika-...@googlegroups.com
I'd avoid using threads and depending on Python version, I'd use the AsyncIO connection adapter that the AsyncIO IOLoop.

Mateusz Głowiński

unread,
Sep 2, 2022, 9:08:39 AM9/2/22
to Pika
Thank you Gavin for an answer :)
Reply all
Reply to author
Forward
0 new messages