When to use threads with SelectConnection

65 views
Skip to first unread message

James McKinney

unread,
Jul 1, 2023, 1:00:49 PM7/1/23
to Pika
I had been using BlockingConnection, but this adapter makes it difficult/impossible to survive RabbitMQ restarts. (I've tried catching ConnectionClosedByBroker, but this exception seems to be raised from outside any code I write – I can't catch it. If there is a consistent way to recover from this event, please let me know.)

I am therefore switching to SelectConnection, which offers a callback for this event.

When using BlockingConnection, I had been using a ThreadPoolExecutor, and the on_message callback would submit work to the pool, since some tasks take longer than a reasonable heartbeat. Also, this enables for concurrent processing of messages, allowing me to get better performance with a higher prefetch count.

I've read the documentation/issues/discussions, but it's unclear to me whether I need to use threads with SelectConnection for either (1) heartbeat or (2) concurrency, or both.

Could you clarify?

James McKinney

unread,
Jul 1, 2023, 11:00:00 PM7/1/23
to pika-...@googlegroups.com
Okay, I can confirm that threads are definitely needed for concurrency in SelectConnection (the IO loop blocks on all adapters).

New issue: I'm working out how to wait for the threads to terminate, before closing the channel and connection. For example, in the basic.cancel-ok callback, I run:

self.executor.shutdown(cancel_futures=True)
self.channel.close()

ThreadPoolExecutor.shutdown() has wait=True by default, so it blocks until all threads terminate. At the end of most of my threads, I use connection.ioloop.add_callback_threadsafe to call channel.basic_ack() in the callback. From debugging, I can see that:

1. add_callback_threadsafe() returns, in the thread
2. the thread terminates
3. executor.shutdown() returns, in the basic.cancel-ok callback
4. channel.close() is called, in the basic.cancel-ok callback
5. the thread-safe callback runs, but the channel is now closed

Is there any way to ensure that the callback runs, before the channel is closed?


--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pika-python/9f1f3111-4ed9-4ed4-860e-1ad250287025n%40googlegroups.com.


--

James McKinney

Head of Technology

+1-514-247-0223 | @mckinneyjames | timezone: EST


💙💛 Learn how we are supporting Ukraine's Reconstruction here #URC2023 💙💛

www.open-contracting.org | follow us @opencontracting

Nough You

unread,
Jul 2, 2023, 3:48:13 AM7/2/23
to pika-...@googlegroups.com
isn't that why try: except: finally: exists? try finally. 

Gavin M. Roy

unread,
Jul 2, 2023, 10:09:49 AM7/2/23
to pika-...@googlegroups.com
You should never use threads. The whole idea of using an async client is to dispatch your code on the IOLoop, not work around it.

James McKinney

unread,
Jul 2, 2023, 11:12:56 AM7/2/23
to pika-...@googlegroups.com
> The whole idea of using an async client is to dispatch your code on the IOLoop, not work around it.

Maybe I'm missing something, but to test, I wrote a simple consumer callback that sleeps for 5 seconds and logs messages:

def consumer_callback(channel, method, properties, body):
    logger.info("Sleep")
    time.sleep(5)
    logger.info("Wake!")

I then created 10 messages in RabbitMQ. Using `basic_consume` on the SelectConnection, the log messages appear as:

Sleep
Wake!
Sleep
Wake!
etc.

with five seconds elapsing between "Sleep" and "Wake!", so about 50 seconds total.

If instead I use a consumer callback that submits the messages to a thread pool (whose callback is otherwise identical), I see:

Sleep
Sleep
...
Wake!
Wake!
...

With 5 seconds between the "Sleep" messages and the "Wake!" messages, so 5 seconds in total.

If there's a way to get the same concurrency without manually creating threads, I would love to know.

> isn't that why try: except: finally: exists? try finally. 

To answer my question, I need to do:

self.executor.shutdown(cancel_futures=True)
self.connection.ioloop.call_later(0, self.channel.close)

This will ensure the (pending) thread-safe callback is run before the (newly-scheduled) channel.close.


James McKinney

unread,
Jul 2, 2023, 11:13:46 AM7/2/23
to pika-...@googlegroups.com
I forgot to mention: In my test I set prefetch_count to 10, so it gets all the messages at once.

Gavin M. Roy

unread,
Jul 2, 2023, 11:20:28 AM7/2/23
to pika-...@googlegroups.com
time.sleep will block the IOLoop and is generally a bad idea in async code. Depending on your needs, you might want to consider using the AsyncIO connection adapter, then you can use asyncio.sleep: https://docs.python.org/3/library/asyncio-task.html#asyncio.sleep

On Sun, Jul 2, 2023 at 11:12 AM James McKinney <jmck...@open-contracting.org> wrote:

James McKinney

unread,
Jul 2, 2023, 11:30:45 AM7/2/23
to pika-...@googlegroups.com
Aha. I just used sleep here to try to simulate a CPU-heavy computation. However, if I simulate that with something like:

for i in range(100000000):
    pass

I see that the messages are indeed processed concurrently.

Thanks!

Gavin M. Roy

unread,
Jul 2, 2023, 11:39:07 AM7/2/23
to pika-...@googlegroups.com
If you're thinking of doing compute heavy work on an IOLoop you should consider using the ThreadPoolExecutor for that processing with the IOLoop staying in the foreground. See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

James McKinney

unread,
Jul 2, 2023, 12:01:49 PM7/2/23
to pika-...@googlegroups.com
At first you said threads shouldn't be used, but they are fine to use for compute heavy work?

If I'm using the SelectConnection, would I use the ThreadPoolExecutor from concurrent.futures instead?

Or, do you recommend AsyncioConnection over SelectConnection, anyway?

Gavin M. Roy

unread,
Jul 2, 2023, 12:11:49 PM7/2/23
to pika-...@googlegroups.com
Sorry I was not clear.

1. Threads should not be intermixed in the actual pika client usage. It's very easy to get threading wrong and have incorrect protocol behaviors.
2. I would absolutely use asyncio over the IOLoop in the SelectConnection. There's a whole ecosystem around asyncio in Python3 you benefit from. If asyncio existed when I wrote SelectConnection, I wouldn't have written SelectConnection or it's IOLoop.
3. The idea behind the threaded executor is you let the asyncio loop manage the background execution of CPU intensive tasks in a way that doesn't interrupt or change the context of IOLoop and the things running on it. 

More context: amqp is a bidirectional RPC protocol. To do it right, the client library should be able to respond in real time to RPCs from the server to the client. It also expects strict sequencing of RPC frames. Introducing threads that are aware of pika connections and channels introduces the ability to intersperse RPCs frames in a way that breaks the protocol. Instead if you receive a message for example, then dispatch the work via a thread pool executor managed by the IOLoop, the client usage stays non-threaded, while compute heave work is processed "in the background" and returns from the function when complete as if it were just a normal execution of a function on the ioloop.


James McKinney

unread,
Jul 3, 2023, 5:52:01 PM7/3/23
to pika-...@googlegroups.com
Thank you for the explanation!

In case anyone is interested, I wrote a package for my Pika wrappers (to avoid repetitive boilerplate across my projects). https://yapw.readthedocs.io/en/latest/

James

On Sun, Jul 2, 2023 at 12:11 PM Gavin M. Roy <gavi...@gmail.com> wrote:
Sorry I was not clear.

1. Threads should not be intermixed in the actual pika client usage. It's very easy to get threading wrong and have incorrect protocol behaviors.
2. I would absolutely use asyncio over the IOLoop in the SelectConnection. There's a whole ecosystem around asyncio in Python3 you benefit from. If asyncio existed when I wrote SelectConnection, I wouldn't have written SelectConnection or it's IOLoop.
3. The idea behind the threaded executor is you let the asyncio loop manage the background execution of CPU intensive tasks in a way that doesn't interrupt or change the context of IOLoop and the things running on it. 

More context: amqp is a bidirectional RPC protocol. To do it right, the client library should be able to respond in real time to RPCs from the server to the client. It also expects strict sequencing of RPC frames. Introducing threads that are aware of pika connections and channels introduces the ability to intersperse RPCs frames in a way that breaks the protocol. Instead if you receive a message for example, then dispatch the work via a thread pool executor managed by the IOLoop, the client usage stays non-threaded, while compute heave work is processed "in the background" and returns from the function when complete as if it were just a normal execution of a function on the ioloop.


On Sun, Jul 2, 2023 at 12:01 PM James McKinney <jmckinney@open-contracting.org> wrote:
At first you said threads shouldn't be used, but they are fine to use for compute heavy work?

If I'm using the SelectConnection, would I use the ThreadPoolExecutor from concurrent.futures instead?

Or, do you recommend AsyncioConnection over SelectConnection, anyway?

On Sun, Jul 2, 2023 at 11:39 AM Gavin M. Roy <gavi...@gmail.com> wrote:
If you're thinking of doing compute heavy work on an IOLoop you should consider using the ThreadPoolExecutor for that processing with the IOLoop staying in the foreground. See https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.


--

James McKinney

Head of Technology

+1-514-247-0223 | @mckinneyjames | timezone: EST


💙💛 Learn how we are supporting Ukraine's Reconstruction here #URC2023 💙💛

www.open-contracting.org | follow us @opencontracting

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.


--

James McKinney

Head of Technology

+1-514-247-0223 | @mckinneyjames | timezone: EST


💙💛 Learn how we are supporting Ukraine's Reconstruction here #URC2023 💙💛

www.open-contracting.org | follow us @opencontracting

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.


--

James McKinney

Head of Technology

+1-514-247-0223 | @mckinneyjames | timezone: EST


💙💛 Learn how we are supporting Ukraine's Reconstruction here #URC2023 💙💛

www.open-contracting.org | follow us @opencontracting

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.


--

James McKinney

Head of Technology

+1-514-247-0223 | @mckinneyjames | timezone: EST


💙💛 Learn how we are supporting Ukraine's Reconstruction here #URC2023 💙💛

www.open-contracting.org | follow us @opencontracting

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Pika" group.
To unsubscribe from this group and stop receiving emails from it, send an email to pika-python+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pika-python/CAPnH5kpW9kMV%2BOy2mq%3DWMeFJG39Nq_3oJ1p%2B_0hTOq1jHEvNog%40mail.gmail.com.
Reply all
Reply to author
Forward
0 new messages