Seems like i was wrong.
BlockingConnections calls `callback` function one by one and waits until first one ends.
I tried something like this:
channel.basic_consume(
queue=rockwell_queue.method.queue,
on_message_callback=threaded_general_callback,
auto_ack=True,
)
LOCKS = defaultdict(Lock)
def general_callback(ch, method, properties, body):
parameters = json.loads(body)
id = int(parameters["id"])
timeout = int(parameters.get("timeout", 3600))
lock = LOCKS[plc_id]
locked = lock.acquire(timeout=timeout)
try:
if locked:
handle_action()
if (result is not None) and (bool(properties.reply_to)):
ch.connection.add_callback_threadsafe(
functools.partial(
ch.basic_publish,
exchange="",
routing_key=properties.reply_to,
body=json.dumps(result).encode(),
)
)
finally:
if locked:
lock.release()
def threaded_general_callback(ch, method, properties, body):
Thread(target=general_callback, args=(ch, method, properties, body)).start()
Do you think that upper logic with threads make sense or it's better to use `SelectedConnection`?