Python Threadpool doesn't seem to resolve Stream Connection Lost error/missed heartbeats from client

97 views
Skip to first unread message

Paul Lam

unread,
Jan 29, 2024, 10:01:29 PMJan 29
to Pika
For my code, I dedicate my main thread to handle shorter tasks (less than 5 seconds).  If it runs more than 10 seconds, then my worker thread would step in and run it.  Tasks can last for up to 10 minutes.  Rabbit server heartbeat timeout is 240 seconds (probably the last thing I'd do is to change the server heartbeat timeout unless it's really necessary, since there are a bunch of consumers).  With the approach below:

class Consumer(EventProcessor):
    self.thread_pool_executor = ThreadPoolExecutor(max_workers=1)
(... skipping the queue_declare part)
    self.rabbit_channel.queue_bind(
        queue=self.rabbit_queue_name,
        exchange=self.internal_rabbit_exchange_name,
        routing_key=routing_key,
        callback=self.handle_queue_bound,
    )
    self.rabbit_channel.basic_qos(prefetch_count=3)
    self.rabbit_channel.basic_consume(
        on_message_callback=self.on_message, queue="my_queue"
    )

    def on_message(self, channel, method, header, body):
            try:
                self.handle_event_delivery(channel, method, header, body, True)
            except MainThreadTriggerTimeoutError:
                    handle_event_delivery_cb = functools.partial(self.handle_event_delivery, channel, method, header, body, False)
                    future = self.thread_pool_executor.submit(handle_event_delivery_cb)

    def handle_event_delivery(self, channel, method, header, body, is_main_thread):
            if is_main_thread:
                main_thread_trigger_timeout_handler_cb = functools.partial(main_thread_trigger_timeout_handler, event=event)
                signal.signal(signal.SIGALRM, main_thread_trigger_timeout_handler_cb)
                signal.alarm(5)
            time.sleep(10 if is_main_thread else 900) # simulating task duration

def main_thread_trigger_timeout_handler(signum, stack_frame, event):
    """Raise a MainThreadTriggerTimeoutError exception"""
    raise MainThreadTriggerTimeoutError(
        "[Main Thread] Trigger took longer than {0} " "seconds to complete, using a worker thread to run the event to prevent dropped server heartbeats. Event: {1}" .format(5, event)
    )

My code is still leading to connection error at times: [Errno 104] Connection reset by peer, and missed heartbeat from client in rabbitmq.  I thought, for consumer, if worker thread deals with longer tasks, then main thread can deal with sending heartbeat to the rabbitmq server?  Am I missing something?  Or is my code somehow blocking the main thread from sending the heartbeat to the server?  It seems like main thread and worker thread are running concurrently as of the logs from consumer.  Is thread pool not a viable option?
Reply all
Reply to author
Forward

Luke Bakken

unread,
Jan 30, 2024, 11:32:37 AMJan 30
to Pika
>  Or is my code somehow blocking the main thread from sending the heartbeat to the server? 

Yes, that's most likely it. Without a complete set of code I can't say for sure.

What does RabbitMQ log at the same time as the missed heartbeat message?

You should use the process_data_events method instead of time.sleep to sleep.

Thanks,
Luke

Paul Lam

unread,
Jan 30, 2024, 1:44:33 PMJan 30
to Pika
I see thanks for the help!  I am relieved that it's most likely just the main thread blocking issue and not something else.

For timer.sleep(), looking at the comments in the source code it seems to ignore the frames from the broker, and perhaps it ignores the request for heartbeat from broker?  I am currently also using SelectConnection instead of BlockingConnection, which doesn't seem to provide process_data_events.  Is there any relevant method for SelectConnection?

 I am wondering what exactly it means by 'data events' as well.

For more relevant code, I called:
        ack_callback = functools.partial(self.ack_message, channel, method.delivery_tag)
        channel.connection.ioloop.add_callback_threadsafe(ack_callback)
same as the given sample code.  Also, referring to the sample code I am wondering why it is using the traditional thread library, while ThreadPool already seems to provide an out-of-the-box mechanism for managing thread enqueuing by nature.

As of server logs:

=INFO REPORT==== 25-Jan-2024::19:33:57 ===
connection : user 'guest' authenticated and granted access to vhost '/'

=ERROR REPORT==== 26-Jan-2024::02:45:11 ===
closing AMQP connection
missed heartbeats from client, timeout: 240s

=INFO REPORT==== 26-Jan-2024::03:02:48 ===
accepting AMQP connection

=INFO REPORT==== 26-Jan-2024::03:02:48 ===
connection: user 'guest' authenticated and granted access to vhost '/'

=WARNING REPORT==== 26-Jan-2024::03:27:43 ===
closing AMQP connection(vhost: '/', user: 'guest'):
client unexpectedly closed TCP connection <-- Just shut down 

Cheers,
Paul



Luke Bakken

unread,
Apr 5, 2024, 6:41:36 PMApr 5
to Pika
Hi Paul,

I can assist you further if you can provide a complete code sample I can run.

Reply all
Reply to author
Forward
0 new messages
Search
Clear search
Close search
Google apps
Main menu