For AsyncService returning stream, Next in busy loop returns ok as false, if RPC in other thread and completion queue gets called

169 views
Skip to first unread message

Avinash Jagtap IN

unread,
Jun 22, 2021, 12:03:15 AM6/22/21
to grpc.io
We are facing an issue where Next of a completion queue returns ok as false, if any other RPC is called.
We need to implement a simple functionality of progress reporting of a long running RPC. So we need two RPCs
1. StartScan - to trigger the long running scan - runs in separate thread and its own completion queue
2. RegisterScanProgress  - to register for the callback that reports status of the long running scan.  - runs in separate thread and its own completion queue

If we call  StartScan  first, and then cqProgress_(ServerCompletionQueue for RegisterScanProgress) behaves as expected. though, if we call RegisterScanProgress  first, then immediately after call to StartScan the Next of cqProgress_  returns ok as false.
Need help to understand this behavior. As after calling RegisterScanProgress there could be call to any other RPC, and so we'd need its queue to keep functioning for progress reporting.
It'll be of great help if someone helps us understand what we are missing.

Proto looks like following
service ScanService{
    rpc StartScan (ScanRequest) returns (ScanReply) {}
    rpc RegisterScanProgress (ScanProgressRequest) returns (stream ScanProgressReply) {}
}
message ScanRequest{
  string scanType = 1;
}
message ScanReply{
  bool scanAlreadyInProgress  = 1;
}
message ScanProgressRequest {
}
enum ScanStatus
{
  NOT_STARTED = 0;
  STARTED     = 1;
  IN_PROGRESS = 2;
  FINISHED_SUCCESS  = 3;
  FINISHED_FAILED   = 4;
}
message ScanProgressReply {
    ScanStatus status = 1;
    string details = 2;
}

this is how the server code looks

class ServerImpl
{
public:
  ~ServerImpl()
  {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
  }

  void Run()
  {
    //... other server config code
    grpc::EnableDefaultHealthCheckService(true);
    grpc::reflection::InitProtoReflectionServerBuilderPlugin();
    grpc::ServerBuilder builder;

    builder.AddListeningPort(server_address, serverCredentials);

    cq_ = builder.AddCompletionQueue();
    cqProgress_ = builder.AddCompletionQueue();

    // Use Keep-alive to stop initial slow calls
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 100000);
    builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
    builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
    builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);

    // Finally assemble the server.
    server_ = (builder.BuildAndStart());

    // kick start the queues for async RPCs
    HandleStartScan(cq_.get());
    HandleRegisterScanProgress(cqProgress_.get());
    progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, cq_.get());
    progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, cqProgress_.get());
    server_->Wait();
  }

private:
  void HandleStartScan(ServerCompletionQueue* event_queue)
  {
    new StartScanCallData(&scanAsync_, event_queue);
  }
  void HandleRegisterScanProgress(ServerCompletionQueue* event_queue)
  {
    new RegisterScanProgressCallData(&scanAsync_, event_queue);
  }
  void EventLoop(ServerCompletionQueue* event_queue)
  {
    void* tag; // uniquely identifies a request.
    bool ok;
    while (event_queue->Next(&tag, &ok))
    {
      IAsyncRpcDataAdapter* adapter = static_cast<IAsyncRpcDataAdapter*>(tag);
      if (ok)
      {
        adapter->Proceed();
      }
      else
      {
        std::cout << "OK is false" << std::endl;
        continue;
      }
    }
  }

  std::unique_ptr<ServerCompletionQueue> cq_;
  std::unique_ptr<ServerCompletionQueue> cqProgress_;
  napa::Nvbackend::AsyncService scanAsync_;
  std::unique_ptr<Server> server_;
  std::thread progressReportThread_;
  std::thread progressOtherRpcs_;
};

int main(int argc, char** argv)
{
  ServerImpl server;
  server.Run();
  return 0;
}

and this is how the service code looks

enum CallStatus
{
  CREATE,
  PROCESS,
  FINISH,
  PUSH_TO_BACK
};

// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
  virtual void Proceed() = 0;
  virtual ~IAsyncRpcDataAdapter() = default;

  protected:
    IAsyncRpcDataAdapter(ScanService* service, ServerCompletionQueue* cq)
      : scanAsync_(service)
      , cq_(cq)
      , status_(CREATE)
    {
    }
    ScanService* scanAsync_;
    ServerCompletionQueue* cq_;
    ServerContext ctx_;
    CallStatus status_; // The current serving state.
    grpc::Alarm alarm_;
};

class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
  StartScanCallData(ScanService* service, ServerCompletionQueue* cq);
  void Proceed() override;

private:
  ScanRequest request_;
  ScanReply reply_;

  ServerAsyncResponseWriter<ScanReply> responder_;
  grpc::Status StartScan(grpc::ServerContext *context, const ScanRequest *request, ScanReply *reply);
};

class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
  RegisterScanProgressCallData(ScanService* service, ServerCompletionQueue* cq);
  void Proceed() override;

private:
  ScanProgressRequest request_;
  ScanProgressReply reply_;
  ServerAsyncWriter<ScanProgressReply> responder_;

  void ProgressReport();
  void waitForScanEvent();
  std::unique_ptr<std::promise<AppScanStatus>> scanPromise_;
};

Vijay Pai

unread,
Jun 28, 2021, 5:00:44 PM6/28/21
to grpc.io
It's not really possible to say anything about this without knowing what's in "Proceed" .

Avinash Jagtap IN

unread,
Jun 28, 2021, 11:30:30 PM6/28/21
to grpc.io
here is the complete code:
#include <stdlib.h>

#pragma region gRPC
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#pragma endregion gRPC

#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <iomanip>
#include <thread>

#pragma region Generated Files
#include "napa_plugins_config.hpp"
#include "napa_plugins_core_ssl.hpp"
#pragma endregion Generated Files

#pragma region
#include "rapidjson/document.h"
#pragma endregion

#include <future>

#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/alarm.h>

#include "protos/PingPong.grpc.pb.h"
#include "protos/PingPong.pb.h"

using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerCompletionQueue;
using grpc::ServerAsyncResponseWriter;


class PingPongServiceImpl final
    : public ping_pong::PingPong::Service {
public:
    ~PingPongServiceImpl() {
        std::cout << "Destructor of PingPongServiceImpl" << std::endl;
    }
private:
    grpc::Status PingUnary(
        grpc::ServerContext* context,
        const ping_pong::PingPongRequest* request,
        ping_pong::PingPongReply* reply) override
    {
        std::cout << "PingPong" << std::endl;

        std::cout
            << "PingPong - input_msg = "
            << request->input_msg()
            << std::endl;

        if (request->input_msg() == "hello") {
            reply->set_output_msg("world");
        }
        else {
            reply->set_output_msg("I can't pong unless you ping me 'hello'!");
        }

        std::cout << "Replying with " << reply->output_msg() << std::endl;

        return grpc::Status::OK;
    }

};

typedef void ScanProgressCallbackType(_Inout_opt_ void* context, _In_z_ const std::string location);
typedef void ScanStartedCallbackType(_Inout_opt_ void* context);
typedef void ScanFinishedCallbackType(_Inout_opt_ void* context, _In_ bool scanFailed);

class DummyScanner
{
public:
    DummyScanner()
        :scanContext_(NULL),
        scanCount_(0),
        scanProgressCallback_(NULL),
        scanStartedCallback_(NULL),
        scanFinishedCallback_(NULL)
    {
    }
    ~DummyScanner()
    {
        if (dummyScanThread_.joinable())
        {
            dummyScanThread_.join();
        }
    }
    void RegisterScanProgressCallbacks(
        ScanProgressCallbackType* onScanProgress,
        ScanStartedCallbackType* onScanStarted,
        ScanFinishedCallbackType* onScanFinished,
        _In_opt_ void* context)
    {
        scanProgressCallback_ = onScanProgress;
        scanStartedCallback_ = onScanStarted;
        scanFinishedCallback_ = onScanFinished;
        scanContext_ = context;
    }

    bool StartScan()
    {
        if (scanCount_ > 0)
        {
            return false;
        }
        if (dummyScanThread_.joinable())
        {
            dummyScanThread_.join();
        }

        dummyScanThread_ = std::thread(&DummyScanner::dummyScanAndReport, this);
        return true;
    }

private:
    void dummyScanAndReport()
    {
        while (true)
        {
            if (scanCount_ == 100)
            {
                if (scanFinishedCallback_ != NULL)
                {
                    scanFinishedCallback_(scanContext_, false);
                }
                break;
            }
            if (scanCount_ == 1)
            {
                if (scanStartedCallback_ != NULL)
                {
                    scanStartedCallback_(scanContext_);
                }
            }
            if (scanCount_ < 100)
            {
                if (scanProgressCallback_ != NULL)
                {
                    std::stringstream ss;
                    ss << scanCount_;
                    scanProgressCallback_(scanContext_, ss.str());
                }
            }
            scanCount_++;
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        scanCount_ = 0;
    }


private:
    void* scanContext_;
    int scanCount_;
    ScanProgressCallbackType* scanProgressCallback_;
    ScanStartedCallbackType* scanStartedCallback_;
    ScanFinishedCallbackType* scanFinishedCallback_;
    std::thread dummyScanThread_;
};

enum CallStatus
{
    CREATE,
    PROCESS,
    FINISH,
    PUSH_TO_BACK
};

// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
    virtual void Proceed() = 0;
    virtual std::string Name() = 0;
    virtual ~IAsyncRpcDataAdapter() = default;

protected:
    IAsyncRpcDataAdapter(ping_pong::PingPong::AsyncService* service, ServerCompletionQueue* cq)
        : pingPongAsync_(service)
        , cq_(cq)
        , status_(CREATE)
    {
    }
    ping_pong::PingPong::AsyncService* pingPongAsync_;
    ServerCompletionQueue* cq_;
    ServerContext ctx_;
    CallStatus status_; // The current serving state.
    grpc::Alarm alarm_;
};

class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
    StartScanCallData(ping_pong::PingPong::AsyncService* service, ServerCompletionQueue* cq)
    {
        Proceed();
    }

    void Proceed() override
    {
        std::cout << __FUNCTION__ << std::endl;
        if (status_ == CREATE)
        {
            status_ = PROCESS;
            std::cout << __FUNCTION__ << ":- " << "CREATE this - " << this << std::endl;
            pingPongAsync_->RequestStartScan(&ctx_, &request_, &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
        }
        else if (status_ == PROCESS)
        {
            std::cout << __FUNCTION__ << ":- " << "PROCESS this - " << this << std::endl;
            // we need to keep an instance of api call data in the queue so that it keeps waiting in HandleRpcs.
            new StartScanCallData(pingPongAsync_, cq_);
            const Status status = StartScan(&ctx_, &request_, &reply_);
            if (status.ok())
            {
                status_ = FINISH;
                responder_.Finish(reply_, Status::OK, this);
            }
            else
            {
                responder_.FinishWithError(status, this);
            }
        }
        else
        {
            GPR_ASSERT(status_ == FINISH);
            std::cout << __FUNCTION__ << ":- " << "FINISH this - " << this << std::endl;
            delete this;
        }
    }

    DummyScanner scanner_;
    std::string Name() override
    {
        return "StartScanCallData";
    }

private:
    grpc::Status StartScan(grpc::ServerContext* context, const ping_pong::ScanRequest* request, ping_pong::ScanReply* reply)
    {
        std::cout << __FUNCTION__ << ":- " << "\n\n----------------------------------------" << std::this_thread::get_id() << std::endl;
        // start dummy thread callback scan
        scanner_.StartScan();
        reply->set_scanalreadyinprogress(true);
        return Status::OK;
    }
    ping_pong::ScanRequest request_;
    ping_pong::ScanReply reply_;

    ServerAsyncResponseWriter<ping_pong::ScanReply> responder_;
};

class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
    RegisterScanProgressCallData(ping_pong::PingPong::AsyncService* service, ServerCompletionQueue* cq)
    {
        Proceed();
    }

    void Proceed() override
    {
        std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << std::endl;
        if (status_ == CREATE)
        {
            status_ = PROCESS;
            std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " CREATE this - " << this << std::endl;
            pingPongAsync_->RequestRegisterScanProgress(&ctx_, &request_, &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
        }
        else if (status_ == PROCESS)
        {
            std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " PROCESS this - " << this << std::endl;
            if (!callBackTriggered_)
            {
                // we need to keep an instance of api call data in the queue so that it keeps waiting in HandleRpcs.
                new RegisterScanProgressCallData(pingPongAsync_, cq_);
                const Status status = RegisterScanProgress();
                if (!status.ok())
                {
                    status_ = FINISH;
                    responder_.Finish(Status::OK, this);
                    return;
                }
                else
                {
                    scanStatus_ = ScanStatus::NOT_STARTED;
                }
                callBackTriggered_ = true;
            }
            else
            {
                waitForScanEvent();
            }
            std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " Calling ProgressReport" << std::endl;
            ProgressReport();
        }
        else if (status_ == PUSH_TO_BACK)
        {
            status_ = PROCESS;
            std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " ********************************* Setting alarm" << std::endl;
            alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
        }
        else
        {
            GPR_ASSERT(status_ == FINISH);
            std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " FINISH this - " << this << std::endl;
            delete this;
        }
    }

    std::string Name() override
    {
        return "RegisterScanProgressCallData****";
    }

private:
    grpc::Status RegisterScanProgress()
    {
        std::cout << __FUNCTION__ << ":- " << "---------------------------------------- thread - " << std::this_thread::get_id() << std::endl;
        scanner_.RegisterScanProgressCallbacks(RegisterScanProgressCallData::OnScanProgress,
            RegisterScanProgressCallData::OnScanStarted,
            RegisterScanProgressCallData::OnScanFinished,
            this);
        return Status::OK;
    }

    grpc::Status UnRegisterScanProgress()
    {
        return Status::OK;
    }

    void UpdateReply()
    {
        reply_.set_status(scanStatus_);
        reply_.set_location(scanLocation_);
    }

    void ProgressReport()
    {
        UpdateReply();
        std::cout << __FUNCTION__ << ":- " << "reporting ScanStatus - " << scanStatus_ << " thread - " << std::this_thread::get_id() << std::endl;
        if ((scanStatus_ == ScanStatus::FINISHED_SUCCESS)
            || (scanStatus_ == ScanStatus::FINISHED_FAILED))
        {
            status_ = FINISH;
            std::cout << __FUNCTION__ << ":- " << "writing to responder_ this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
            responder_.Finish(Status::OK, this);
        }
        else
        {
            status_ = PUSH_TO_BACK;
            std::cout << __FUNCTION__ << ":- " << "writing to responder_ this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
            responder_.Write(std::move(reply_), this);
        }
    }

    static void OnScanProgress(void* context, const std::string location)
    {
        try
        {
            auto _this = reinterpret_cast<RegisterScanProgressCallData*>(context);
            std::cout << __FUNCTION__ << ":- " << "thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanStatus_ = ScanStatus::IN_PROGRESS;
            _this->scanLocation_.assign(location);
            std::cout << __FUNCTION__ << ":- " << "triggering progress event thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanPromise_->set_value(ScanStatus::IN_PROGRESS);
        }
        catch (...)
        {
            // Avoid exception being thrown through DLL boundary.
        }
    }

    static void OnScanStarted(void* context)
    {
        try
        {
            auto _this = reinterpret_cast<RegisterScanProgressCallData*>(context);
            std::cout << __FUNCTION__ << ":- " << "thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanStatus_ = ScanStatus::STARTED;
            std::cout << __FUNCTION__ << ":- " << "triggering started event thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
            _this->scanPromise_->set_value(ScanStatus::STARTED);
        }
        catch (...)
        {
            // Avoid exception being thrown through DLL boundary.
        }
    }

    static void OnScanFinished(void* context, bool scanFailed)
    {
        try
        {
            auto _this = reinterpret_cast<RegisterScanProgressCallData*>(context);
            if ((_this->scanStatus_ != ScanStatus::FINISHED_FAILED) && (_this->scanStatus_ != ScanStatus::FINISHED_SUCCESS))
            {
                std::cout << __FUNCTION__ << ":- " << "thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
                _this->scanStatus_ = (scanFailed == true) ? ScanStatus::FINISHED_FAILED : ScanStatus::FINISHED_SUCCESS;
                std::cout << __FUNCTION__ << ":- " << "triggering finished event thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
                _this->scanPromise_->set_value(_this->scanStatus_);
            }
        }
        catch (...)
        {
            // Avoid exception being thrown through DLL boundary.
        }
    }


    void waitForScanEvent()
    {
        if (scanPromise_ != nullptr)
        {
            scanPromise_.release();
        }
        scanPromise_ = std::make_unique<std::promise<ScanStatus>>();
        auto scanFuture = scanPromise_->get_future();
        std::cout << __FUNCTION__ << ":- " << "waiting for started event with scanStatus_ - " << scanStatus_ << std::endl;
        auto eventData = scanFuture.get();
        scanStatus_ = eventData;
        std::cout << __FUNCTION__ << ":- " << "got started event now scanStatus_ - " << scanStatus_ << std::endl;
    }

    ping_pong::ScanProgressRequest request_;
    ping_pong::ScanProgressReply reply_;
    ServerAsyncWriter<ping_pong::ScanProgressReply> responder_;

    ping_pong::ScanStatus scanStatus_;
    std::string scanLocation_;
    bool callBackTriggered_ = false;
    std::unique_ptr<std::promise<ping_pong::ScanStatus>> scanPromise_;
};

class ServerImpl
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        // Always shutdown the completion queue after the server.
        cq_->Shutdown();
    }

    void Run()
    {
        rapidjson::Document napaPluginConfig;
        napaPluginConfig.Parse(napa_plugin_config_json.c_str());
        int port(napaPluginConfig["port"].GetInt());
        std::string hostname(napaPluginConfig["hostname"].GetString());
        std::string server_address(hostname + ":" + std::to_string(port));

        // Check for Server Credentials
        // Comes as a string, we want a boolean.
        auto strGrpcUseInsecureCreds = getenv("NAPA_GRPC_USE_INSECURE_CREDS");
        // By default we will *not* use SSL for HTTP2.
        // TODO(Keith): Once Envoy understands SSL, revert back to false.
        bool bGrpcUseInsecureCreds = true;
        if (strGrpcUseInsecureCreds != NULL)
        {
            std::istringstream(strGrpcUseInsecureCreds) >> std::boolalpha >> bGrpcUseInsecureCreds;
        }

        std::cout
            << "Environment variable NAPA_GRPC_USE_INSECURE_CREDS = "
            << bGrpcUseInsecureCreds
            << std::endl;

        grpc::EnableDefaultHealthCheckService(true);
        grpc::reflection::InitProtoReflectionServerBuilderPlugin();
        grpc::ServerBuilder builder;

        std::shared_ptr<grpc::ServerCredentials> serverCredentials;

        if (bGrpcUseInsecureCreds)
        {
            // Listen on the given address without any authentication mechanism.
            serverCredentials = grpc::InsecureServerCredentials();
            std::cout
                << "NOTE: We are using gRPC insecure server credentials, without SSL."
                << "This means you are not using HTTP2"
                << std::endl;
        }
        else
        {
            grpc::SslServerCredentialsOptions sslOptions;
            sslOptions.pem_root_certs = ssl_credentials_ca_cert.c_str();
            sslOptions.pem_key_cert_pairs.push_back({ ssl_credentials_server_key.c_str(),
                                                     ssl_credentials_server_cert.c_str() });

            serverCredentials = grpc::SslServerCredentials(sslOptions);
            std::cout
                << "NOTE: We are using gRPC secure server credentials, with SSL."
                << "This means you are using HTTP2"
                << std::endl;
        }

        builder.AddListeningPort(server_address, serverCredentials);

        builder.RegisterService(&pingPongAsync_);

        cq_ = builder.AddCompletionQueue();
        cqProgress_ = builder.AddCompletionQueue();

        // Use Keep-alive to stop initial slow calls
        builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
        builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
        builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
        builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
        builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);

        // Finally assemble the server.
        server_ = (builder.BuildAndStart());
        std::cout
            << "Server listening on "
            << server_address
            << std::endl;

        // kick start the queues for async RPCs
        HandleStartScan();
        HandleRegisterScanProgress();
        progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, cq_.get());
        progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, cqProgress_.get());
        // Wait for the server to shutdown. Note that some other thread must be
        // responsible for shutting down the server for this call to ever return.
        server_->Wait();
    }

private:
    void HandleStartScan()
    {
        new StartScanCallData(&pingPongAsync_, cq_.get());
    }

    void HandleRegisterScanProgress()
    {
        new RegisterScanProgressCallData(&pingPongAsync_, cqProgress_.get());
    }

    // TODO - we'll need to have more than one event loops. running in thread for the RPC
    void EventLoop(ServerCompletionQueue* event_queue)
    {
        void* tag; // uniquely identifies a request.
        bool ok;

        while (event_queue->Next(&tag, &ok))
        {
            IAsyncRpcDataAdapter* adapter = static_cast<IAsyncRpcDataAdapter*>(tag);
            if (ok)
            {
                std::cout << __FUNCTION__ << ":- EventLoop event for  " << " thread - " << std::this_thread::get_id() << " queue " << event_queue << " tag " << adapter->Name() << std::endl;
                adapter->Proceed();
            }
            else
            {
                std::cout << __FUNCTION__ << ":- Need to see why this failed for " << " thread - " << std::this_thread::get_id() << " queue " << event_queue << " tag " << adapter->Name() << std::endl;
                adapter->Proceed();
                continue;
            }
        }
    }

    std::unique_ptr<Server> server_;
    std::unique_ptr<ServerCompletionQueue> cq_;
    std::unique_ptr<ServerCompletionQueue> cqProgress_;
    ping_pong::PingPong::AsyncService pingPongAsync_;
    std::thread progressReportThread_;
    std::thread progressOtherRpcs_;
};

int main(int argc, char** argv)
{
    ServerImpl server;
    server.Run();
    return 0;
}
// -------------------------------------------------------------------------
// proto file
syntax = "proto3";

package ping_pong;

service PingPong {
  rpc PingUnary (PingPongRequest) returns (PingPongReply) {}
  rpc StartScan (ScanRequest) returns (ScanReply) {}
  rpc RegisterScanProgress (ScanProgressRequest) returns (stream ScanProgressReply) {}
}

message PingPongRequest {
    string input_msg = 1;
}

message PingPongReply {
    string output_msg = 1;
}

message ScanRequest{
  string scanType = 1;
}

message ScanReply{
  bool scanAlreadyInProgress  = 1;
}

message ScanProgressRequest {
}

enum ScanStatus
{
  NOT_STARTED = 0;
  STARTED     = 1;
  IN_PROGRESS = 2;
  FINISHED_SUCCESS  = 3;
  FINISHED_FAILED   = 4;
}

message ScanProgressReply {
    ScanStatus status = 1;
    string location = 2;
}



Avinash Jagtap IN

unread,
Jun 29, 2021, 12:07:57 AM6/29/21
to grpc.io
FYI - not sure if it'd matter, though, we are testing this implementation using Kreya tool & there is envoy proxy between client and GRPC
Reply all
Reply to author
Forward
0 new messages