Improve StreamLostError handling

47 views
Skip to first unread message

Clément Doumouro

unread,
Jun 22, 2023, 4:29:02 AM6/22/23
to Pika
Hi, 

I'm currently following the asynchronous consumer example and I'm trying to trigger reconnection only when a given set of recoverable errors occur.

Sadly error are wrapped as string inside the StreamLostError which prevents me to correctly handle them.

My code looks like this:

class MessageConsumer:
    _RECOVERABLE_ERRORS = (..., ...)

    def __init__(
        self,
       
    ):
        self._should_reconnect = False
        self._closing = False
       
    @property
    def should_reconnect(self) -> bool:
        return self._should_reconnect

    def connect(self):
        self._connection_ = SelectConnection(
            parameters=...,
            on_open_callback=self._on_connection_open,
            on_open_error_callback=self._on_connection_open_error,
            on_close_callback=self._on_connection_closed,
        )

    def stop(self):
        ...

    def _on_connection_open:
        ...

    def _on_connection_open_error(
        self, _unused_connection: BaseConnection, err: Exception
    ):
        if isinstance(err, self._RECOVERABLE_ERRORS):
            self._trigger_reconnect()
        self.stop()

    def _on_connection_closed(
        self, _unused_connection: BaseConnection, reason: Exception
    ):
        self._channel_ = None
        if self._closing:  # The connection was closed on purpose
            self._connection.ioloop.stop()
        else:
            if isinstance(reason, self._RECOVERABLE_ERRORS):
                self._trigger_reconnect()
            self.stop()

    def _trigger_reconnect(self):
        self._should_reconnect = True

Would you accept a contribution I which I would wrap update the StreamLostError to look a bit like the AMQPConnectorPhaseErrorBase and wrap the exception that triggered it ? This would allow to retrieve the original exception and perform proper error handling.

The question is somehow related to issue 1390 but I want to achieve something way simpler since I'm not interested into getting the error trace, just the original error type.

Thanks for your help !
Clément

Luke Bakken

unread,
Jun 27, 2023, 9:44:41 AM6/27/23
to Pika
Hello!

Yes please feel free to open a pull request and I will review it. Thank you!

Clément Doumouro

unread,
Jul 2, 2023, 1:54:01 PM7/2/23
to Pika
Perfect, I'll try to do it this week !
Reply all
Reply to author
Forward
0 new messages