I'm currently following the
asynchronous consumer example and I'm trying to trigger reconnection only when a given set of recoverable errors occur.
Sadly error are wrapped as string inside the StreamLostError which prevents me to correctly handle them.
My code looks like this:
class MessageConsumer:
_RECOVERABLE_ERRORS = (..., ...)
def __init__(
self,
):
self._should_reconnect = False
self._closing = False
@property
def should_reconnect(self) -> bool:
return self._should_reconnect
def connect(self):
self._connection_ = SelectConnection(
parameters=...,
on_open_callback=self._on_connection_open,
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed,
)
def stop(self):
...
def _on_connection_open:
...
def _on_connection_open_error(
self, _unused_connection: BaseConnection, err: Exception
):
if isinstance(err, self._RECOVERABLE_ERRORS):
self._trigger_reconnect()
self.stop()
def _on_connection_closed(
self, _unused_connection: BaseConnection, reason: Exception
):
self._channel_ = None
if self._closing: # The connection was closed on purpose
self._connection.ioloop.stop()
else:
if isinstance(reason, self._RECOVERABLE_ERRORS):
self._trigger_reconnect()
self.stop()
def _trigger_reconnect(self):
self._should_reconnect = True
Would you accept a contribution I which I would wrap update the StreamLostError to look a bit like the AMQPConnectorPhaseErrorBase and wrap the exception that triggered it ? This would allow to retrieve the original exception and perform proper error handling.
The question is somehow related to
issue 1390 but I want to achieve something way simpler since I'm not interested into getting the error trace, just the original error type.
Thanks for your help !
Clément