Hi All,
I am trying to write an airflow job which publishes a message and closes the connection.
I am having an issue where when I publish a message to an exchange and immediately kill the pod, the message doesn't get delivered. Whereas if I add time.sleep and wait for 30 seconds, it successfully gets delivered.
I am using a blocking channel and running process_data_events(time_limit=1) on a thread to handle events. I am also using thread safe publish.
Publish message -
def publish_message(self, exchange_name: str, routing_key: str, message: QueueMessage):
self._validate_publish_message_input(exchange_name, routing_key, message)
exchange_name = self._fix_exchange_name(exchange_name)
self._declare_exchange_threadsafe(exchange_name)
self._send_message(exchange_name, routing_key, message)
def _send_message(self, exchange_name: str, routing_key: str, message: QueueMessage):
message_bytes = message.SerializeToString()
self._send_message_threadsafe(exchange_name, routing_key, message_bytes)
def _send_message_threadsafe(self, exchange_name, routing_key, message):
channel = self._connection_manager.get_channel()
connection = channel.connection
connection.add_callback_threadsafe(
lambda: channel.basic_publish(exchange_name, routing_key, message)
)
def _declare_exchange_threadsafe(self, exchange_name):
channel = self._connection_manager.get_channel()
connection = channel.connection
connection.add_callback_threadsafe(
# declared only if it doesn't exist already
lambda: channel.exchange_declare(exchange_name)
)
Could someone help me with what the issue might be?