Need help with grpc c++ client and java server

372 views
Skip to first unread message

mukul singh

unread,
Mar 8, 2018, 10:54:01 AM3/8/18
to grpc.io
Hi All,

I am trying to develop a cpp client for a grpc server implemented using Java.
With this change, I am trying to develop a cpp client for Apache Ratis project.

The client is trying to implement read/write functions for a bi-directional append api, as in the Proto file below.

Following are the series of events and results

1)          std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto, RaftClientReplyProto>>

                        cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1));


client created a new ClientAsyncReaderWriter and this invokes the stream observer constructor on java.


  AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
LOG.info("new AppendRequestStreamObserver {}", name);
this.responseObserver = ro;


2) the next call is for write, to write a RaftClientRequestProto to the stream.
-- this call has no effect on the server. However the 'ok' flag from the completion queue return true.

3) the subsequent WritesDone done succeeds as well, and the log entries on server also confirm that the stream observer is closed.

org.apache.ratis.grpc.client.RaftClientProtocolService: completed request 


Questions:
1) I would like to invoke the onNext() api on the Java Server.
How can this be achieved from a cpp client.

======== Code Below ============

Proto:

syntax = "proto3";
option java_package = "org.apache.ratis.shaded.proto.grpc";
option java_outer_classname = "GRpcProtos";
option java_generate_equals_and_hash = true;
package ratis.grpc;

import "Raft.proto";

service RaftClientProtocolService {
// A client-to-server RPC to set new raft configuration
rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
returns(ratis.common.RaftClientReplyProto) {}

// A client-to-server stream RPC to append data
rpc append(stream ratis.common.RaftClientRequestProto)
returns (stream ratis.common.RaftClientReplyProto) {}
}

Server code:

private class AppendRequestStreamObserver implements
StreamObserver<RaftClientRequestProto> {
private final String name = getId() + "-" + streamCount.getAndIncrement();
private final StreamObserver<RaftClientReplyProto> responseObserver;
private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
= new SlidingWindow.Server<>(name, COMPLETED);
private final AtomicBoolean isClosed;

AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
LOG.info("new AppendRequestStreamObserver {}", name);
this.responseObserver = ro;
this.isClosed = new AtomicBoolean(false);
}

void processClientRequestAsync(PendingAppend pending) {
try {
protocol.submitClientRequestAsync(pending.getRequest()
).thenAcceptAsync(reply -> slidingWindow.receiveReply(
pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
).exceptionally(exception -> {
// TODO: the exception may be from either raft or state machine.
// Currently we skip all the following responses when getting an
// exception from the state machine.
responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
return null;
});
} catch (IOException e) {
throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
}
}

@Override
public void onNext(RaftClientRequestProto request) {
try {
final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
LOG.info("recieved request " + r.getCallId());
final PendingAppend p = new PendingAppend(r);
slidingWindow.receivedRequest(p, this::processClientRequestAsync);
} catch (Throwable e) {
responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
}
}

private void sendReply(PendingAppend ready) {
Preconditions.assertTrue(ready.hasReply());
if (ready == COMPLETED) {
close();
} else {
LOG.info("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
responseObserver.onNext(
ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
}
}

@Override
public void onError(Throwable t) {
// for now we just log a msg
LOG.info(name + ": onError", t);
slidingWindow.close();
}

@Override
public void onCompleted() {
LOG.info("completed request ");
if (slidingWindow.endOfRequests()) {
close();
}
}

Server logs:

2018-03-08 15:44:06,688 INFO org.apache.ratis.grpc.client.RaftClientProtocolService: new AppendRequestStreamObserver 127.0.0.1_9858-3

2018-03-08 15:44:06,688 INFO org.apache.ratis.grpc.client.RaftClientProtocolService: completed request 



client code:

        std::shared_ptr< Channel > channel = grpc::CreateChannel("localhost:9858",

                        grpc::InsecureChannelCredentials());

        std::unique_ptr<RaftClientProtocolService::Stub> stub = RaftClientProtocolService::NewStub(channel);

        ContainerCommandRequestProto* read_requet = read_container("container1");

        std::cout << ":cmd type is" << read_requet->cmdtype() << std::endl;


        RaftClientRequestProto* req = create_request(read_requet, sizeof(ContainerCommandRequestProto));

        grpc::ClientContext ctx;

        grpc::CompletionQueue cq;

        std::unique_ptr<ClientAsyncReaderWriter<RaftClientRequestProto, RaftClientReplyProto>>

                        cli_stream(stub->Asyncappend(&ctx, &cq, (void*)1));

        void* got_tag;

        bool ok = false;

        cq.Next(&got_tag, &ok);

        std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;


        cli_stream->Write(*req, (void*)2);

        cq.Next(&got_tag, &ok);

        std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;

        if (ok && got_tag == (void*)2) {

         std::cout << "sent correct request" <<std::endl;

        // check reply and status

        }

        cli_stream->WritesDone((void*)3);

        cq.Next(&got_tag, &ok);

        std::cout << "tag" << got_tag << "   ok"<< ok<<std::endl;

        if (ok && got_tag == (void*)3) {

         std::cout << "sent correct request" <<std::endl;

        // check reply and status

        }


client logs:


cmd type is2

tag0x1   ok1

tag0x2   ok1

sent correct request

tag0x3   ok1

sent correct request

sending request for reply

mukul singh

unread,
Mar 9, 2018, 1:39:19 PM3/9/18
to grpc.io
This was because of difference in protobuf version. client was running 3.0.0 while server was running 3.1.0.
Resolving the protobuf version resolved the issue.
Reply all
Reply to author
Forward
0 new messages