pubsub examples?

2,022 views
Skip to first unread message

Mark NS

unread,
Aug 26, 2016, 6:13:32 AM8/26/16
to grpc.io
Hi,

I'm looking into using gRPC for a line-of-business app with 10-100 users. Python server side, C# client side. The server side application is stateful, and clients will have both request-response and streaming communications.
 
I was previously working with ZeroMQ (changing to gRPC for firewall traversal etc), which has pubsub patterns built in. Are there any examples of such a pattern implemented with gRPC? (I saw the post about cloud pubsub, but I am looking for something brokerless).

- Mark

Varun Talwar

unread,
Aug 26, 2016, 12:50:20 PM8/26/16
to Mark NS, grpc.io
hi mark, message queuing patterns usually involve a broker. For your use case, sounds like point to point will suffice. If that is true, gRPC should work well for mentioned languages.


--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/ae39e96d-ec86-4aee-a200-e70520859128%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Varun Talwar | Product Manager | varun...@google.com | 415-341-7352


Paul Grosu

unread,
Aug 27, 2016, 8:52:15 PM8/27/16
to grpc.io
Hi Mark,

Might the following PubSub via gRPC example be useful:

https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/grpc

Paul

Mark NS

unread,
Aug 29, 2016, 4:00:00 PM8/29/16
to grpc.io
Hi guys,
Thanks for the comments and suggestions so far. @Paul, cheers for the heads-up re cloud pubsub, but using a cloud broker is not suitable for my use case.

I found some time to implement a prototype pubsub servicer, and I'd like to share it because I still have a few doubts if I'm doing things right, and a few specific questions.

The servicer has a single bidirectional streaming endpoint. I spawn a new thread in the implementation to process incoming requests, generate a unique id for the connection, do some book-keeping, and send out updates to subscribed clients as they are published.
                                                                                    
Specific questions:
1. Is it possible to know when a client has disconnected, so I can release any resources that have been allocated?
2. Would it be more idiomatic to use threadlocals somehow for the book-keeping?
                                                         
Any other comments and criticism of the code very welcome ;)


# ------------------------------ pubsub.proto
package pubsub;

service PubSub {
rpc subscribe(stream SubscriptionRequest) returns (stream SubscriptionReply);
}

message SubscriptionRequest {
string topic = 1;
}

message SubscriptionReply {
string topic = 1;
string message = 2;
}

# ------------------------------ pubsub_server.py
import queue
import random
import threading
import uuid
from collections import defaultdict
import time
from concurrent import futures

import grpc

import pubsub_pb2

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class PubSub(pubsub_pb2.PubSubServicer):
def __init__(self):
self.client_q = dict()
self.topics = defaultdict(list)

t = threading.Thread(target=self._fake_publisher)
t.start()

def publish(self, topic, message):
reply = pubsub_pb2.SubscriptionReply(topic=topic,
message=message)
print("Publishing topic {} message {}".format(topic, message))
for clientid in self.topics[topic]:
self.client_q[clientid].put(reply)

def subscribe(self, request_iterator, context):
clientid = uuid.uuid4()
self.client_q[clientid] = queue.Queue()

t = threading.Thread(target=self._process_subscriptions,
args=(clientid, request_iterator))
t.start()

while True:
item = self.client_q[clientid].get()
yield item
self.client_q[clientid].task_done()

# What's the best way to release resources and tidy up
# when the client goes away?

def _process_subscriptions(self, clientid, request_iterator):
for request in request_iterator:
print("Received subscription request from {} for topic {}".format(
clientid, request.topic
))
self.topics[request.topic].append(clientid)

def _fake_publisher(self):
while True:
if len(self.topics):
topic = random.choice(list(self.topics.keys()))
self.publish(topic, str(uuid.uuid4()))
time.sleep(1)


def serve():
server = grpc.server(futures.ThreadPoolExecutor())
pubsub_pb2.add_PubSubServicer_to_server(PubSub(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)


if __name__ == '__main__':
serve()

# ------------------------------ pubsub_client.py
import time
import concurrent

import grpc

import pubsub_pb2

def client(stub):
def my_topics():
yield pubsub_pb2.SubscriptionRequest(topic="grpc")
time.sleep(5)
yield pubsub_pb2.SubscriptionRequest(topic="donkeys")

responses = stub.subscribe(my_topics())
for response in responses:
print("Received topic {} message {}".format(
response.topic, response.message))


def run():
channel = grpc.insecure_channel('localhost:50051')
stub = pubsub_pb2.PubSubStub(channel)

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(client, stub)
]

for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as exc:
print("Generated an exception {}".format(exc))


if __name__ == '__main__':
run()

Mark NS

unread,
Aug 31, 2016, 3:37:59 AM8/31/16
to grpc.io
Anyone have an answer to this question in particular?

1. Is it possible to know when a client has disconnected, so I can release any resources that have been allocated?


Paul Grosu

unread,
Aug 31, 2016, 11:07:44 PM8/31/16
to grpc.io
Hi Mark,

Try to read this example in how to use ServerContext's AsyncNotifyWhenDone:

  https://github.com/grpc/grpc/blob/master/test/cpp/end2end/async_end2end_test.cc

Hope it helps,
Paul
Reply all
Reply to author
Forward
0 new messages