Long publisher recovering

45 views
Skip to first unread message

Сергей Посохов

unread,
Oct 27, 2023, 4:30:04 AM10/27/23
to Pika
Hello, everybody! I have a question about long running publisher example https://github.com/pika/pika/blob/main/examples/long_running_publisher.py

Where should I handle exceptions and try to recover connection if something goes wrong?

There are two places: when process_data_events is called inside thread, and when I use publish method.

I tried to handle exceptions in both places and try to restart thread but I get ConnectionWrongStateError on next coming messages to publish and "Timeout closed before call" error, I don't know where it comes from.

Here is the publish code from main thread:
```
try:
    self._publisher.publish(
         index, {"message": message_json}
    )
    except Exception as exc:
        logging.exception(
        "Exception %s occured during publishing, restoring",
        type(exc).__name__,
    )
    self._publisher.stop()
    self._publisher.join()
    self._publisher.connect()
    self._publisher.start()
```


And here is the  run method from long publisher class:


```
def run(self):
    logging.info("Started MultipleQueuePublisher thread")
    while self.is_running:
         try:
              self._connection.process_data_events(time_limit=1)
         except pika.exceptions.ConnectionClosedByBroker as eccbb:
              logging.error(
               "Broker closed MultipleQueuePublisher connection %s: %s, %s, restoring.",
                  type(eccbb).__name__,
                   eccbb.reply_code,
                   eccbb.reply_text,
                 )
                 if self._connection.is_open:
                       self._connection.close()
                 self.connect()
          except pika.exceptions.AMQPChannelError as achee:
                    logging.error(
                     "Channel %s error occurred in MultipleQueuePublisher, restoring",
                     type(achee).__name__,
                      )
                     if self._connection.is_open:
                            self._connection.close()
                     self.connect()
            except pika.exceptions.AMQPConnectionError as acee:
                       logging.error(
                          "Connection %s error occurred in MultipleQueuePublisher, restoring",
                           type(acee).__name__,
                         )
                        if self._connection.is_open:
                                 self._connection.close()
                        self.connect()
             except (
                          pika.adapters.utils.connection_workflow.AMQPConnectorException
                    ) as ace:
                     logging.error(
                    "Error %s occurred during connection process in MultipleQueuePublisher,                              restoring",
                       type(ace).__name__,
                )
                 if self._connection.is_open:
                           self._connection.close()
                  self.connect()
```

Luke Bakken

unread,
Nov 15, 2023, 8:37:48 AM11/15/23
to Pika
Hello,

Please provide a complete code sample that I can run to reproduce this issue.

Reply all
Reply to author
Forward
0 new messages