Fast publisher issue

20 views
Skip to first unread message

Pradyumna Mahajan

unread,
Aug 5, 2024, 9:12:41 AM8/5/24
to Pika
Hi All,
I am trying to write an airflow job which publishes a message and closes the connection.

I am having an issue where when I publish a message to an exchange and immediately kill the pod, the message doesn't get delivered. Whereas if I add time.sleep and wait for 30 seconds, it successfully gets delivered.

I am using a blocking channel and running process_data_events(time_limit=1) on a thread to handle events. I am also using thread safe publish.
Publish message -

    def publish_message(self, exchange_name: str, routing_key: str, message: QueueMessage):
        self._validate_publish_message_input(exchange_name, routing_key, message)
        exchange_name = self._fix_exchange_name(exchange_name)
        self._declare_exchange_threadsafe(exchange_name)
        self._send_message(exchange_name, routing_key, message)

    def _send_message(self, exchange_name: str, routing_key: str, message: QueueMessage):
        message_bytes = message.SerializeToString()
        self._send_message_threadsafe(exchange_name, routing_key, message_bytes)

    def _send_message_threadsafe(self, exchange_name, routing_key, message):
        channel = self._connection_manager.get_channel()
        connection = channel.connection
        connection.add_callback_threadsafe(
            lambda: channel.basic_publish(exchange_name, routing_key, message)
        )

    def _declare_exchange_threadsafe(self, exchange_name):
        channel = self._connection_manager.get_channel()
        connection = channel.connection
        connection.add_callback_threadsafe(
            # declared only if it doesn't exist already
            lambda: channel.exchange_declare(exchange_name)
        )

Could someone help me with what the issue might be?

Luke Bakken

unread,
Aug 6, 2024, 10:01:52 AM8/6/24
to Pika
Hello,

You need to be sure you're using publisher confirmations - https://www.rabbitmq.com/docs/confirms#publisher-confirms

I can't really tell what Pika features you are using from your code snippets.

Reply all
Reply to author
Forward
0 new messages