Idempotent ZeroRPC server

135 views
Skip to first unread message

Shiplu Mokaddim

unread,
Nov 7, 2019, 7:55:56 AM11/7/19
to zerorpc
I am implementing a server with idempotency.  The idea is if an event is sent with the same message id, and there is already an active task for that event we discard the new event. This is how I have implemented it.


class IdempotentContextedServer(zerorpc.Server):
    def _async_task(self, initial_event):
        channel_id = initial_event.header.get(u'message_id')
        logger.info("executing _async_task for %s", channel_id)
        if channel_id in self._multiplexer.active_channels:
            logger.info('A request with same channel_id %s is already in progress. Ignoring this one', channel_id)
            return

        return super(IdempotentContextedServer, self)._async_task(initial_event)



After implementing this, I am getting the following error when discard is in place.

ERROR: /!\ gevent_zeromq BUG /!\ catching up after missing event (RECV) /!\

My questions
1. Is my implementation correct? if not, could you guide me how can I implement it
2. If the implementation is correct, why I am getting this error? and how can I get rid of this error?

Thanks in advance

François-Xavier Bourlet

unread,
Nov 9, 2019, 5:44:26 PM11/9/19
to zerorpc
This is a warning because of a race condition with zeromq edge triggering. It is notot related to your code. Zerorpc recovers, but you do get some increased latency when it happens.

Otherwise, your code is incorrect. msg_ids are unique to every message. They will never repeat.

Also something idempotent means that redoing the work won't change any state. What you are try to do is to not schedule work if some work with the same identifier is already being processed.

You should use your own identifiers to implement this.



--
You received this message because you are subscribed to the Google Groups "zerorpc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to zerorpc+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/zerorpc/cb25231d-e8bd-4722-afe3-92fc735c465a%40googlegroups.com.

Shiplu Mokaddim

unread,
Nov 11, 2019, 4:02:41 AM11/11/19
to zerorpc
Instead of `messag_id` I can use something different. Then I have to keep my own message id registry. Something like this,

class UniqueContextedServer(zerorpc.Server):
    def __init__(self, *a, **kw):
        super(IdempotentContextedServer, self).__init__(*a, **kw)
        self.active_contexts = set([])

    def _async_task(self, initial_event):
        context_id = initial_event.header.get(u'context_id')
        if context_id is not None and context_id in self.active_contexts:
            return

        self.active_contexts.add(context_id)
        try:
            return super(IdempotentContextedServer, self)._async_task(initial_event)
        finally:
            self.active_contexts.discard(context_id)





About the increased latency introduced by  the race condition on zeromq, how can I reduce the latency? The frequency of this gevent_zeromq bug has increased a lot after implementing `IdempotentContextedServer`. I think this will remain similar when I use `UniqueContextedServer`. So I need a way to get around that increased latency.

Shiplu Mokaddim

unread,
Nov 11, 2019, 4:08:26 AM11/11/19
to zerorpc
By super(IdempotentContextedServer, self)._async_task(initial_event)
I actually meant super(UniqueContextedServer, self)._async_task(initial_event)
Reply all
Reply to author
Forward
0 new messages