Non-blocking single-threaded streaming C++ server

1,044 views
Skip to first unread message

Todd Defilippi

unread,
Feb 22, 2018, 2:01:37 PM2/22/18
to grpc.io
I am trying to write a streaming server as part of implementing a gRPC dial-out collector for Cisco's model-driven telemetry (https://github.com/cisco/bigmuddy-network-telemetry-proto/blob/master/proto_archive/mdt_grpc_dialout/mdt_grpc_dialout.proto).

Is there a way to create the gRPC server with C++ that does not require blocking?  The basic synchronous examples all require blocking, which will not work, so I have been looking at the various asynchronous examples.  I haven't been able to get those to work but I'm not sure I'm doing it correctly.

It seems like I would want to in some way use the CompletionQueues to hold messages that come in and process them when I can.  Is there a reason I would want to use Next() versus AsyncNext() (it seems the former blocks and the latter does not but I'm not sure)?  How does the combination of RegisterService() and AddCompletionQueue() work?

Thanks,
Todd

Yang Gao

unread,
Mar 12, 2018, 3:14:46 PM3/12/18
to grpc.io
Hi Todd,

Sorry for the late reply.

You are right that Next will block until next event is coming out. 
For your purpose, you may want to use AsyncNext with a deadline for a server with async service.
The deadline should be the next time point that you want to break out to do something else on the server.
Note grpc library needs to use the thread donated via Next or AsyncNext to do some background work and thus only AsyncNext infrequently with very short deadline may not be a good idea.
Also, regarding the single-threadedness, the current grpc implementation creates internal threads to do background work such as timer handling and others. As a result, you will not have a truly single-threaded server even if you only use one thread for the server.
This may not be the final form as we intended to support threading model fully controlled by the user. However, there is no timeline for those background threads to be removed at the moment.

Todd Defilippi

unread,
Mar 12, 2018, 3:55:42 PM3/12/18
to grpc.io
Thanks!

I have been messing around with using AsyncNext.  My long-term plan is to have some timer that calls back and checks if there is anything in the completion queue via AsyncNext.  For now, I've just been trying to get some basics to work.  Below I have the code I'm using.  (Ignore the sleeping for now, that's just so it loops a little slower for debugging purposes.)  The issue is, while I do get AsyncNext returning with ok=true and a valid tag, I do not get anything when I try to read from the stream.  Am I not using the stream correctly?

    std::unique_ptr<gRPCMdtDialout::AsyncService> service2;
    service2.reset(new gRPCMdtDialout::AsyncService());

    ServerBuilder builder;
    builder.AddListeningPort(addr, grpc::InsecureServerCredentials());
    builder.RegisterService(service2.get());
    std::unique_ptr<grpc::ServerCompletionQueue> cq = builder.AddCompletionQueue();
    std::unique_ptr<Server> server(builder.BuildAndStart());

    ServerContext ctx_;
    ServerAsyncReaderWriter<MdtDialoutArgs, MdtDialoutArgs> stream_(&ctx_);
    service2->RequestMdtDialout(&ctx_, &stream_, cq.get(), cq.get(), this);

    while (true) {
        void *tag = NULL;
        bool ok = false;
        bool ret = cq->AsyncNext(&tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
        if (ret == 1 && ok) {
            MdtDialoutArgs args;
            stream_.Read(&args, tag);
            cout << " read " << args.ByteSizeLong() << " " << args.reqid() << endl;
        }
        sleep(5);
    }

Thanks,
Todd

Yang Gao

unread,
Mar 13, 2018, 8:01:42 PM3/13/18
to Todd Defilippi, grpc.io
Maybe you are over simplifying the code but the snippet does not look correct to me.

1. Your Read call is async, meaning it is more like StartRead and tell me via tag when there is something. So it does not make sense to log the args on its return.
2. You request a MdtDialOut but you do not seem to handle its tag.
3. You only request a MdtDialOut once which means you will only ever get one incoming RPC
4. You cannot issue a second Read before the first one's tag comes back.
5. As I mentioned, using time_0 may not be sufficient for gRPC to do background work.

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+unsubscribe@googlegroups.com.
To post to this group, send email to grp...@googlegroups.com.
Visit this group at https://groups.google.com/group/grpc-io.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/9a3c9ab2-d4a3-4c98-aec2-600112218124%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
Message has been deleted
Message has been deleted
0 new messages