pika example rabbitMq

175 views
Skip to first unread message

Patryk Cebrat

unread,
Mar 9, 2023, 4:55:51 AM3/9/23
to rabbitmq-users

Hi Pika,


I have read some post in this group and I don’t understand what is the point of using:

https://github.com/pika/pika/blob/main/examples/basic_consumer_threaded.py


I mean, why we need one additional thread for processing message and calling add_callback_threadsafe where as argument we pass ack method (both activities  in the same new thread for processing).

I wonder how we prevent form closing connection from channel with we consume message, because if processing is longer than heartbeat value we send ack afterwards and it is for me the same case like just consuming in the main thread and then sending ack. 

What is the difference if we run processing and sending ack in separate one thread?


I am new in python and pika so sorry if this question is strange

Luke Bakken

unread,
Mar 9, 2023, 7:43:14 PM3/9/23
to rabbitmq-users
Hello,

It would be best for you to demonstrate what you are trying to accomplish via code that you have written and shared (GitHub is a good option).

You may be interested in this discussion, and the code I wrote to go with it:


Thanks,
Luke

Patryk Cebrat

unread,
Mar 10, 2023, 3:44:41 AM3/10/23
to rabbitmq-users
Hi Luke,

nice to hear back from you, I tried to post on the pika group you manage, but it is automatically deleted :(
My code is production and proprietary and I can't share it. I have no experience in python, but we have one service for photo processing because of the large number of libraries. here is my problem:

I receive a message from the channel and process it. However, sometimes when making a request to Azure, I may get a very delayed response. I'm wondering how not to end the connection, I have heartbeat set to 300s and from the documentation I know that the server will send heartbeats every hertbeat/2 = 150s. However, if I process the message for more than 300s, the connection is closed and I can't send the ack. Everything happens at the main thread level. Would separating this, like in your github example, into a separate thread help? if so, why, I can't understand what gives us processing in a new thread and sending the ack in the same thread. For me the only difference is that we do it in a new thread instead of the main thread. Please advise
P.S. yes i know, there is still the issue of handling too long response from azure, but let's focus on rabbitMq and don't close the connection if we exceed hertbeat.

Patryk Cebrat

unread,
Mar 10, 2023, 3:51:06 AM3/10/23
to rabbitmq-users
btw. linked discussion doesn't get me closer to my goal

Luke Bakken

unread,
Mar 10, 2023, 8:44:37 AM3/10/23
to rabbitmq-users
My code is production and proprietary and I can't share it.

Then I usually ask that you create your own program that does the same thing, and share it. It's much easier to work with actual code than descriptions of code.
 
I receive a message from the channel and process it. However, sometimes when making a request to Azure, I may get a very delayed response. I'm wondering how not to end the connection, I have heartbeat set to 300s and from the documentation I know that the server will send heartbeats every hertbeat/2 = 150s. However, if I process the message for more than 300s, the connection is closed and I can't send the ack. Everything happens at the main thread level. Would separating this, like in your github example, into a separate thread help?

Yes, this would fix the problem.
 
if so, why, I can't understand what gives us processing in a new thread and sending the ack in the same thread. For me the only difference is that we do it in a new thread instead of the main thread. Please advise

Please re-read this code:


When a message is consumed, the following happens:
Note that the example code leaves out error-handling and thread-handling (cleanup) details.

Thanks,
Luke

Artur Wroblewski

unread,
Mar 10, 2023, 10:07:05 AM3/10/23
to rabbitm...@googlegroups.com
On Fri, Mar 10, 2023 at 05:44:37AM -0800, Luke Bakken wrote:
[...]
> When a message is consumed, the following happens:
>
> - A thread is started to process the message -
> https://github.com/pika/pika/blob/main/examples/basic_consumer_threaded.py#L43.
> The new thread is necessary because, as you have seen, if the work is done
> on the main thread then Pika's I/O loop is blocked
> (https://stackoverflow.com/a/75691878/1466825)

IMHO, it is probably worth adding, that if a processing thread performs CPU
intensive calculations without releasing Python's GIL, then all other
Python threads will be blocked and Pika won't receive or respond to
a connection heartbeat.

Best regards,

Artur

--
https://mortgage.diy-labs.eu/

Patryk Cebrat

unread,
Mar 11, 2023, 1:55:00 PM3/11/23
to rabbitmq-users
Thank you very much for the clarification, I really appreciate it :) !
I have read how event loop and asyncio and gil in python work, but I would like to ask about two things, rather it is general python knowledge:

1. Will a blocking HTTP request using azure client in a new thread would block I/O loop? (blocking request is in the new processing thread. Will it block the i/o loop)?

2. How to create only one additional thread all the time, and not consume a lot of memory, I mean that callback will be triggered after sending ack(right?). Later in your example callback adds new threads all the time, I can't join that threads because it will block I/O loop. Will an executor with size = 1 be ok? Thread after ack has already finished its work, so it seems sensible.

Luke Bakken

unread,
Mar 14, 2023, 9:37:22 AM3/14/23
to rabbitmq-users
1. Will a blocking HTTP request using azure client in a new thread would block I/O loop? (blocking request is in the new processing thread. Will it block the i/o loop)?

You can test this out yourself by running this code and changing the sleep to 10 minutes. Since the log level is set to DEBUG, you will be able to see heartbeat messages being processed. 
 
2. How to create only one additional thread all the time, and not consume a lot of memory, I mean that callback will be triggered after sending ack(right?). Later in your example callback adds new threads all the time, I can't join that threads because it will block I/O loop. Will an executor with size = 1 be ok? Thread after ack has already finished its work, so it seems sensible.

Give it a try yourself! You should set prefetch to 1 to only consume one message at a time - https://github.com/pika/pika/blob/main/examples/basic_consumer_threaded.py#L65-L68

Luke 

Patryk Cebrat

unread,
Mar 14, 2023, 10:45:54 AM3/14/23
to rabbitmq-users
 "prefetch count" value using the basic.qos method. The value defines the max number of unacknowledged deliveries that are permitted on a channel.
I read it in the rabbitMq documentation and debugging the code snippet by hand, I know it, just wanted to make sure. Like i siad i am new in python and not everything is clear. But now I see after reading about asyncio, event loop and threads.

Thanks
Reply all
Reply to author
Forward
0 new messages