RabbitMQ quorum queues - redriven message

34 views
Skip to first unread message

Marcin Kowalczyk

unread,
Oct 14, 2024, 12:25:31 PM10/14/24
to Jasmin SMS Gateway
Hi,

 I wonder if anybody uses quorum queues not to loose data in case of rabbit crash.

I'm doing some extensive testing and everything looks fine until for any reason submit.sm.connector message get requeued. 

In such case jasmin AMQ worker crashes and it will not recover and stops processing AMQ messages (doesn't matter if it's pub or sub action) until redrived message is deleted from faulty queue. 

Only difference in message payload is presence of x-delivery-count header if message was delivered more than once. I think kind of workaround (with loosing messages) is to set delivery limit = 1. 

It's easy to reproduce - just push some messages to stopped connector, retrive them without ack'ing and start connector back

I get following error in log but I was unable to find module that throws that error. 

Unhandled Error
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/twisted/python/log.py", line 96, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/usr/local/lib/python3.11/site-packages/twisted/python/log.py", line 80, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/usr/local/lib/python3.11/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/local/lib/python3.11/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
--- <exception caught here> ---
  File "/usr/local/lib/python3.11/site-packages/twisted/internet/posixbase.py", line 482, in _doReadOrWrite
    why = selectable.doRead()
  File "/usr/local/lib/python3.11/site-packages/twisted/internet/tcp.py", line 248, in doRead
    return self._dataReceived(data)
  File "/usr/local/lib/python3.11/site-packages/twisted/internet/tcp.py", line 253, in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "/usr/local/lib/python3.11/site-packages/txamqp/protocol.py", line 189, in dataReceived
    frame = self._unpack_frame(packet)
  File "/usr/local/lib/python3.11/site-packages/txamqp/protocol.py", line 162, in _unpack_frame
    payload = Frame.DECODERS[frame_type].decode(self.spec, c)
  File "/usr/local/lib/python3.11/site-packages/txamqp/connection.py", line 185, in decode
    properties[str(f.name)] = c.decode(f.type)
  File "/usr/local/lib/python3.11/site-packages/txamqp/codec.py", line 90, in decode
    return getattr(self, "decode_" + field_type)()
  File "/usr/local/lib/python3.11/site-packages/txamqp/codec.py", line 248, in decode_table
    raise ValueError(repr(item_type))
builtins.ValueError: b'l'

Reply all
Reply to author
Forward
0 new messages