Best practices and patterns for raising unsolicited events and implement pub/sub?

285 views
Skip to first unread message

mai...@gmail.com

unread,
Jan 30, 2017, 6:58:21 AM1/30/17
to grpc.io
Hi.

What is the proper way to implement unsolicited events (fired by server toward all its clients)? Should I use a never-returning server-stream rpc method (i.e. a "subscribe" method) in order to keep a set of IServerStreamWriter references and use such references to reach clients back?

Same pattern should apply to a pub/sub scenario, as far as I understand.

Thanks!

Carl Mastrangelo

unread,
Jan 31, 2017, 3:39:42 PM1/31/17
to grpc.io, mai...@gmail.com
Your client should establish a connection to the server, and just wait for events to come back.  What you described sound right, though you should probably refresh connections every hour or so.  Connections go stale occasionally and you should design for that possibility.

What language are you using?

mai...@gmail.com

unread,
Feb 1, 2017, 5:55:19 AM2/1/17
to grpc.io, mai...@gmail.com
Thanks Carl.

I am using C# and at this time I developed a simple client/server pair where I use:

(server-side) a ConcurrentDictionary<stringIServerStreamWriter<SubscribeStateReply>> in order to keep subscriber "references" (i.e. back channels server will send event notifications through)

in order to keep "back channels" open I use (server-side) following Subscribe method implementation:

public override async Task SubscribeState(SubscribeStateRequest request, IServerStreamWriter<SubscribeStateReply> responseStream, ServerCallContext context)
        {
            _subscribers[context.Peer] = responseStream;
 
            while (_subscribers.ContainsKey(context.Peer)) await Task.Delay(TimeSpan.FromSeconds(5));
        }

Also, in order to "fire" events managing potentially disconnected clients (i.e. subscribers) I didn't find anything better than:

foreach (var sub in _subscribers.Where(s=>s.Key!=context.Peer))
            {
                await _semaphore.WaitAsync().ConfigureAwait(false);
 
                if (!_subscribersExceptionCount.ContainsKey(sub.Key)) _subscribersExceptionCount[sub.Key] = 0;
 
                try
                {
                    await sub.Value.WriteAsync(new SubscribeStateReply() {UpdatedState = request.State});
 
                    _subscribersExceptionCount[sub.Key]=0;
                }
                catch (Exception ex)
                {
                    _subscribersExceptionCount[sub.Key]++;
 
                    if (_subscribersExceptionCount[sub.Key] >= 10)
                    {
                        IServerStreamWriter<SubscribeStateReply> removed;
                        if (_subscribers.TryRemove(sub.Key, out removed))
                        {
                            int currentCount;
                            _subscribersExceptionCount.TryRemove(sub.Key, out currentCount);
                        }
                    }
 
                    Console.WriteLine(ex);
                }
                finally
                {
                    _semaphore.Release();
                }
            }

...where _subscribersExceptionCount is a dictionary that keeps consecutive exception failures count for each subscriber.

Any suggestion? When you say "refresh connections" you actually mean sending replies through...i.e. kind of heartbeat notifications?

Thanks!

mai...@gmail.com

unread,
Feb 7, 2017, 10:27:17 AM2/7/17
to grpc.io, mai...@gmail.com
...any news? Did anybody set up a similar solution?

Thanks!

Carl Mastrangelo

unread,
Feb 8, 2017, 12:02:25 PM2/8/17
to grpc.io, mai...@gmail.com
I am sorry, but I am not that familiar with the c# API, so I can't give any advice there.

jonatha...@gmail.com

unread,
Sep 10, 2018, 1:47:31 PM9/10/18
to grpc.io
Hello,

have you found an answer to your question? I am also interested in implementing the same system.

Thanks,

Jonathan
Reply all
Reply to author
Forward
0 new messages