streaming asio not working as expected

82 views
Skip to first unread message

Jeff

unread,
Mar 12, 2019, 5:21:48 PM3/12/19
to grpc.io

Hello all,

Sorry for the long post. I am trying to figure out how to use grpc asio streaming and I am coming up short. I put together the below client/server from code I got from the helloworld example plus an example I saw on the internet. It works, but the server only serves a single client. The second client blocks forever, waiting to get a response from the server. The first client works just fine. Can someone provide me with some insight into how to fix this so my asio server can process multiple clients? Thanks.


PROTO
--------------------------------------------------

syntax = "proto3";

package asio_server;

// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}



CLIENT
--------------------------------------------------

/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

#include <iostream>
#include <memory>
#include <string>
#include <grpcpp/grpcpp.h>
#include "asio_server.grpc.pb.h"

using namespace grpc;
using namespace asio_server;


class GreeterClient {
public:
GreeterClient(std::shared_ptr<Channel> channel)
: stub_(Greeter::NewStub(channel)) {}

std::string SayHello(const std::string& user) {
HelloRequest request;
request.set_name(user);
HelloReply reply;
ClientContext context;

std::unique_ptr<ClientReader<HelloReply> > reader(
stub_->SayHello(&context, request));

while (reader->Read(&reply)) {
std::cout << "Rcvd reply: " << reply.message() <<std::endl;
}

return std::string("done");
}

private:
std::unique_ptr<Greeter::Stub> stub_;
};

int main(int argc, char** argv) {
GreeterClient greeter(grpc::CreateChannel(
"localhost:50051", grpc::InsecureChannelCredentials()));
std::string user("world");
std::string reply = greeter.SayHello(user);
std::cout << "Greeter received: " << reply << std::endl;

return 0;
}



SERVER
--------------------------------------------------
#include <algorithm>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <chrono>

#include <grpc/grpc.h>
#include <grpc++/server.h>
#include <grpc++/server_builder.h>
#include <grpc++/server_context.h>
#include <grpc++/security/server_credentials.h>
#include "asio_server.pb.h"
#include "asio_server.grpc.pb.h"
#include <grpcpp/grpcpp.h>


using namespace grpc;
using namespace asio_server;


class ServerImpl final {
public:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}

// There is no shutdown handling in this code.
void Run() {
std::string server_address("0.0.0.0:50051");

ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService(&service_);
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
cq_ = builder.AddCompletionQueue();
// Finally assemble the server.
server_ = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;

// Proceed to the server's main loop.
HandleRpcs();
}

private:
// Class encompasing the state and logic needed to serve a request.
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), repliesSent_(0), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}

void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
std::cout << "Creating Call data for new client connections: " << this << std::endl;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
(void*) this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);

// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() +
std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " << reply_.message() << std::endl;
responder_.Write(reply_, this);
status_ = PROCESSING;
repliesSent_++;

} else if (status_ == PROCESSING) {
if (repliesSent_ == MAX_REPLIES) {
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(Status::OK, this);
} else {
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name() + std::to_string(repliesSent_ + 1));
std::cout << "Sending reponse: " << this << " : " << reply_.message() << std::endl;
responder_.Write(reply_, this);
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
status_ = PROCESSING;
repliesSent_++;
}
} else {
GPR_ASSERT(status_ == FINISH);
std::cout << "Completed RPC for: " << this << std::endl;
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}

private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerBuilder* builder_;

ServerCompletionQueue* cq_;

// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;

// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;

uint32_t repliesSent_;
const uint32_t MAX_REPLIES = 1000000000;

// The means to get back to the client.
ServerAsyncWriter<HelloReply> responder_;

// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, PROCESSING, FINISH };
CallStatus status_; // The current serving state.
};

// This can be run in multiple threads if needed.
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}

std::unique_ptr<ServerCompletionQueue> cq_;
Greeter::AsyncService service_;
std::unique_ptr<Server> server_;
};

int main(int argc, char** argv) {
ServerImpl server;
server.Run();

return 0;
}


OUTPUT
--------------------------------------------------
Server terminal:
~/grpc/examples/cpp/asio$ ./asio_server
Server listening on 0.0.0.0:50051
Creating Call data for new client connections: 0x8a25e0
Creating Call data for new client connections: 0x8aaff0
Sending reponse: 0x8a25e0 : Hello world1
Sending reponse: 0x8a25e0 : Hello world2
Sending reponse: 0x8a25e0 : Hello world3
Sending reponse: 0x8a25e0 : Hello world4
Sending reponse: 0x8a25e0 : Hello world5
Sending reponse: 0x8a25e0 : Hello world6
Sending reponse: 0x8a25e0 : Hello world7

Client 1 Terminal:
~/grpc/examples/cpp/asio$ ./asio_client
Rcvd reply: Hello world1
Rcvd reply: Hello world2
Rcvd reply: Hello world3
Rcvd reply: Hello world4
Rcvd reply: Hello world5
Rcvd reply: Hello world6
Rcvd reply: Hello world7

Client 2 Terminal:

~/grpc/examples/cpp/asio$ ./asio_client


// code blocks forever in while(reader->read(&reply))


Jeff

unread,
Mar 12, 2019, 5:31:24 PM3/12/19
to grpc.io
I see now that the issue was that client 1 wasn’t terminating, and so the server never moved on to client2. I expected the server would alternate (potentially non-deterministically) between clients, and not serve them in order. Is there a way around this?

sheen...@google.com

unread,
Mar 19, 2019, 2:40:38 PM3/19/19
to grpc.io
This is not the right forum for debugging application code. Please try StackOverflow for that, since the number of grpc users is a lot more than the number of grpc developers and one of them may decide to spend some time on this for you. This forum is best for clarifications or discussions of feature requests.

As a small point, you have a sleep and many couts in your async code processing code, which is not a good idea since it prevents other operations during that time. It likely won't solve your issue, but you're preventing the CQ::Next loop from running so no more tags can be posted in this time. You should keep that section as fast as possible.

As far as serving clients in order, gRPC does not enforce any ordering for clients (we have load balancing options for picking _servers_), so that is probably due to your code or whenever threads get to run.

Jeff

unread,
Mar 19, 2019, 3:13:45 PM3/19/19
to grpc.io
Great, here’s some feature requests (now that I am sure I have the right forum):

1) Add an async streaming example to your cpp example code. Launching a thread per connection is somewhat wasteful, so streaming and async go hand in hand. It is a large omission.
2) When c++ grpc app is spinning on cq->next, it serves clients in a non-uniform manner. A *very* non-uniform manner. Clients can starve. This should be addressed, as it does not seem like an uncommon use case.
3) AsyncNext takes a deadline. If that deadline is 0 (using any gpr_clock_type) I expect it to return immediately if there is nothing in the queue. This happens for GPR_CLOCK_PRECISE but for no others. There should simply be a TryNext method.
4) It would be nice if there was some way to specify socket options. ie TCP_NODELAY and other such options should be readily accessible. Not supporting this is more than just an inconvenience for a programmer imo.

Also, please clarify the threads grpc launches at startup (for C++, or in general if it is all the same). Can you enumerate them and briefly discuss their purpose?

Thank you so much.








Vijay Pai

unread,
Mar 21, 2019, 1:52:31 PM3/21/19
to grpc.io
Thanks for the comments.

1. The async streaming example will happen in the relatively near future, most probably when the callback API is considered production-ready. Meanwhile, I can suggest that you use some of the codes in test/cpp/end2end or test/cpp/qps as models of the async streaming API
2. We have lots of codes running like this in production and haven't seen this sort of behavior. As the previous responder pointed out, putting a sleep in an async-processing loop is a ltitle suspect so can this simply be the result of unusual code being tested
3. This is indeed the expected behavior and our test/cpp/end2end_test looks at this with CLOCK_REALTIME . Can you give a sample code where you're seeing blocking when giving a deadline in the past?
4. TCP_NODELAY is the default for all sockets created in gRPC for Posix systems. We would prefer to expose higher-level concepts through the API like latency-sensitive, throughput-sensitive, etc. But this is something that we could provide in the future; we have a socket mutator option to set socket features but the API is not final or stable at the current time. Can I suggest that you file a feature request in our Github repo for this so that it can be prioritized and tracked?

gRPC launches its own threads in two categories: executor and timer. The executor is used for DNS resolution as well as offloading certain tasks that are off the critical path (certain writes in the transport layer etc). The timer threads are used for maintaining all the tracking of time in gRPC for timing-sensitive features of the transport or the API (like alarms). Otherwise the work is done in the application's own threads when it calls CQ::Next or a blocking RPC method.

- Vijay

Jeff

unread,
Mar 21, 2019, 2:14:52 PM3/21/19
to grpc.io

2) You can take the sleeps out of the code and try it yourself. The sleeps dont matter. If you are in a tight loop calling cq->next with multiple clients, is it inconceivable that the grpc core could starve a client? I am not familiar with your design but I think if you take a close look at it, you will be able to reason about it purely from code inspection. I witnessed it often, so it simply
can’t be impossible and almost certainly isn’t implausable. If you tinker with my example I would be surprised if you can’t reproduce. I ran
on ubuntu 16.04 using grpc 1.19.
3) try doing an AsyncNext with deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_nanos(0, GPR_TIMESPAN)). I expect it to return immediately and i block.
Reply all
Reply to author
Forward
0 new messages