Hi,
I managed to write a minimal client/server example that causes a crash in the client code in RunInterceptor and I was wondering if I'm doing something wrong.
Proto File:
syntax = "proto3";
package myService;
service MyService
{
rpc Ping(EmptyMessage) returns (PongResponse) {}
rpc StartStream(EmptyMessage) returns (stream StreamEvent) {}
}
message EmptyMessage
{
}
message PongResponse
{
bool pong = 1;
}
message StreamEvent
{
uint64 value = 1;
uint32 otherValue = 2;
bool yetAnotherValue = 3;
}
Server File:
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable : 4251) // Disable dll-class warning
# pragma warning(disable : 4127) // Disable conditional expression is constant
#endif
#include "service.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#ifdef _WIN32
# pragma warning(pop)
#endif
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <memory>
class Service final : public myService::MyService::Service
{
public:
Service(std::string const& uri)
{
_builder.AddListeningPort(uri, grpc::InsecureServerCredentials());
_builder.RegisterService(this);
}
void start()
{
_server = _builder.BuildAndStart();
}
void stop()
{
if (_server != nullptr)
{
_server->Shutdown();
}
}
private:
virtual ::grpc::Status Ping(::grpc::ServerContext* /*context*/, const ::myService::EmptyMessage* /*request*/, ::myService::PongResponse* response) override
{
std::cout << "Ping request\n";
response->set_pong(true);
return grpc::Status::OK;
}
virtual ::grpc::Status StartStream(::grpc::ServerContext* /*context*/, const ::myService::EmptyMessage* /*request*/, ::grpc::ServerWriter<::myService::StreamEvent>* writer) override
{
{
auto evt = myService::StreamEvent{};
evt.set_value(0x1);
evt.set_othervalue(0x2);
evt.set_yetanothervalue(false);
writer->Write(evt);
}
return grpc::Status::OK;
}
grpc::ServerBuilder _builder{ grpc::ServerBuilder{} };
std::unique_ptr<grpc::Server> _server{ nullptr };
};
int main(int /*argc*/, char** /*argv*/)
{
auto const ServerAddress = std::string{ "0.0.0.0:50051" };
auto service = Service{ ServerAddress };
service.start();
std::cout << "Server listening on " << ServerAddress << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3000));
service.stop();
return 0;
}
Client File:
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable : 4251) // Disable dll-class warning
# pragma warning(disable : 4127) // Disable conditional expression is constant
# pragma warning(disable : 4702) // Disable unreachable code
#endif
#include "service.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#ifdef _WIN32
# pragma warning(pop)
#endif
#include <iostream>
#include <string>
class Observer : public grpc::ClientReadReactor<myService::StreamEvent>
{
public:
Observer(myService::MyService::Stub* stub) noexcept
: _stub{ stub }
{
auto context = grpc::ClientContext{};
auto request = myService::EmptyMessage{};
_stub->async()->StartStream(&context, &request, this);
StartRead(&_event); // Start reading (actually postponed until StartCall is called)
StartCall();
}
private:
virtual void OnReadDone(bool ok) override
{
if (ok)
{
std::cout << "StreamEvent: " << std::to_string(_event.value()) << " - " << std::to_string(_event.othervalue()) << " - " << (_event.yetanothervalue() ? "True" : "False") << std::endl;
// Read next event
StartRead(&_event);
}
}
myService::MyService::Stub* _stub{ nullptr };
myService::StreamEvent _event{};
};
int main(int /*argc*/, char** /*argv*/)
{
auto const ServerAddress = std::string{ "127.0.0.1:50051" };
auto channel = grpc::CreateChannel(ServerAddress, grpc::InsecureChannelCredentials());
auto stub = myService::MyService::NewStub(channel);
try
{
auto observer = Observer{ stub.get() };
for (auto i = 0; i < 10000; ++i)
{
auto context = grpc::ClientContext{};
auto request = myService::EmptyMessage{};
auto response = myService::PongResponse{};
stub->Ping(&context, request, &response);
std::cout << "Ping? " << (response.pong() ? "Yes" : "No") << std::endl;
}
}
catch (std::exception const& e)
{
std::cout << "RPC Failed: " << e.what() << std::endl;
}
return 0;
}
The crash occurs when the server writes the StreamEvent (inside the StartStream method) in one thread at the same time as a PongResponse (when returning from the Ping method) in another thread. But the crash occurs in the client code, in the RunInterceptor method.
Is this a bug or am I doing something wrong?
I'm using gRPC 1.41.0 (from vcpkg).
Thanks,
Chris