[c++] RunInterceptor crash

31 views
Skip to first unread message

Christophe Calmejane

unread,
Oct 18, 2022, 6:48:27 AM10/18/22
to grpc.io
Hi,

I managed to write a minimal client/server example that causes a crash in the client code in RunInterceptor and I was wondering if I'm doing something wrong.

Proto File:
syntax = "proto3";

package myService;

service MyService
{
  rpc Ping(EmptyMessage) returns (PongResponse) {}
  rpc StartStream(EmptyMessage) returns (stream StreamEvent) {}
}

message EmptyMessage
{
}

message PongResponse
{
  bool pong = 1;
}

message StreamEvent
{
  uint64 value = 1;
  uint32 otherValue = 2;
  bool yetAnotherValue = 3;
}

Server File:
#ifdef _WIN32
#    pragma warning(push)
#    pragma warning(disable : 4251) // Disable dll-class warning
#    pragma warning(disable : 4127) // Disable conditional expression is constant
#endif
#include "service.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#ifdef _WIN32
#    pragma warning(pop)
#endif

#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <memory>

class Service final : public myService::MyService::Service
{
public:
    Service(std::string const& uri)
    {
        _builder.AddListeningPort(uri, grpc::InsecureServerCredentials());
        _builder.RegisterService(this);
    }

    void start()
    {
        _server = _builder.BuildAndStart();
    }

    void stop()
    {
        if (_server != nullptr)
        {
            _server->Shutdown();
        }
    }

private:
    virtual ::grpc::Status Ping(::grpc::ServerContext* /*context*/, const ::myService::EmptyMessage* /*request*/, ::myService::PongResponse* response) override
    {
        std::cout << "Ping request\n";
        response->set_pong(true);
        return grpc::Status::OK;
    }
    virtual ::grpc::Status StartStream(::grpc::ServerContext* /*context*/, const ::myService::EmptyMessage* /*request*/, ::grpc::ServerWriter<::myService::StreamEvent>* writer) override
    {
        {
            auto evt = myService::StreamEvent{};
            evt.set_value(0x1);
            evt.set_othervalue(0x2);
            evt.set_yetanothervalue(false);
            writer->Write(evt);
        }

        return grpc::Status::OK;
    }

    grpc::ServerBuilder _builder{ grpc::ServerBuilder{} };
    std::unique_ptr<grpc::Server> _server{ nullptr };
};


int main(int /*argc*/, char** /*argv*/)
{
    auto const ServerAddress = std::string{ "0.0.0.0:50051" };

    auto service = Service{ ServerAddress };

    service.start();
    std::cout << "Server listening on " << ServerAddress << std::endl;

    std::this_thread::sleep_for(std::chrono::seconds(3000));

    service.stop();

    return 0;
}


Client File:
#ifdef _WIN32
#    pragma warning(push)
#    pragma warning(disable : 4251) // Disable dll-class warning
#    pragma warning(disable : 4127) // Disable conditional expression is constant
#    pragma warning(disable : 4702) // Disable unreachable code
#endif
#include "service.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#ifdef _WIN32
#    pragma warning(pop)
#endif

#include <iostream>
#include <string>

class Observer : public grpc::ClientReadReactor<myService::StreamEvent>
{
public:
    Observer(myService::MyService::Stub* stub) noexcept
        : _stub{ stub }
    {
        auto context = grpc::ClientContext{};
        auto request = myService::EmptyMessage{};
        _stub->async()->StartStream(&context, &request, this);
        StartRead(&_event); // Start reading (actually postponed until StartCall is called)
        StartCall();
    }

private:
    virtual void OnReadDone(bool ok) override
    {
        if (ok)
        {
            std::cout << "StreamEvent: " << std::to_string(_event.value()) << " - " << std::to_string(_event.othervalue()) << " - " << (_event.yetanothervalue() ? "True" : "False") << std::endl;

            // Read next event
            StartRead(&_event);
        }
    }

    myService::MyService::Stub* _stub{ nullptr };
    myService::StreamEvent _event{};
};

int main(int /*argc*/, char** /*argv*/)
{
    auto const ServerAddress = std::string{ "127.0.0.1:50051" };

    auto channel = grpc::CreateChannel(ServerAddress, grpc::InsecureChannelCredentials());
    auto stub = myService::MyService::NewStub(channel);

    try
    {
        auto observer = Observer{ stub.get() };

        for (auto i = 0; i < 10000; ++i)
        {
            auto context = grpc::ClientContext{};
            auto request = myService::EmptyMessage{};
            auto response = myService::PongResponse{};
            stub->Ping(&context, request, &response);
            std::cout << "Ping? " << (response.pong() ? "Yes" : "No") << std::endl;
        }
    }
    catch (std::exception const& e)
    {
        std::cout << "RPC Failed: " << e.what() << std::endl;
    }
    return 0;
}


The crash occurs when the server writes the StreamEvent (inside the StartStream method) in one thread at the same time as a PongResponse (when returning from the Ping method) in another thread. But the crash occurs in the client code, in the RunInterceptor method.

Is this a bug or am I doing something wrong?
I'm using gRPC 1.41.0 (from vcpkg).
Thanks,
Chris

Reply all
Reply to author
Forward
0 new messages