For my code, I dedicate my main thread to handle shorter tasks (less than 5 seconds). If it runs more than 10 seconds, then my worker thread would step in and run it. Tasks can last for up to 10 minutes. Rabbit server heartbeat timeout is 240 seconds (probably the last thing I'd do is to change the server heartbeat timeout unless it's really necessary, since there are a bunch of consumers). With the approach below:
class Consumer(EventProcessor):
self.thread_pool_executor = ThreadPoolExecutor(max_workers=1)
(... skipping the queue_declare part)
self.rabbit_channel.queue_bind(
queue=self.rabbit_queue_name,
exchange=self.internal_rabbit_exchange_name,
routing_key=routing_key,
callback=self.handle_queue_bound,
)
self.rabbit_channel.basic_qos(prefetch_count=3)
self.rabbit_channel.basic_consume(
on_message_callback=self.on_message, queue="my_queue"
)
def on_message(self, channel, method, header, body):
try:
self.handle_event_delivery(channel, method, header, body, True)
except MainThreadTriggerTimeoutError:
handle_event_delivery_cb = functools.partial(self.handle_event_delivery, channel, method, header, body, False)
future = self.thread_pool_executor.submit(handle_event_delivery_cb)
def handle_event_delivery(self, channel, method, header, body, is_main_thread):
if is_main_thread:
main_thread_trigger_timeout_handler_cb = functools.partial(main_thread_trigger_timeout_handler, event=event)
signal.signal(signal.SIGALRM, main_thread_trigger_timeout_handler_cb)
signal.alarm(5)
time.sleep(10 if is_main_thread else 900) # simulating task duration
def main_thread_trigger_timeout_handler(signum, stack_frame, event):
"""Raise a MainThreadTriggerTimeoutError exception"""
raise MainThreadTriggerTimeoutError(
"[Main Thread] Trigger took longer than {0} " "seconds to complete, using a worker thread to run the event to prevent dropped server heartbeats. Event: {1}" .format(5, event)
)
My code is still leading to connection error at times: [Errno 104] Connection reset by peer, and missed heartbeat from client in rabbitmq. I thought, for consumer, if worker thread deals with longer tasks, then main thread can deal with sending heartbeat to the rabbitmq server? Am I missing something? Or is my code somehow blocking the main thread from sending the heartbeat to the server? It seems like main thread and worker thread are running concurrently as of the logs from consumer. Is thread pool not a viable option?