here is the complete code:
#include <stdlib.h>
#pragma region gRPC
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#pragma endregion gRPC
#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <iomanip>
#include <thread>
#pragma region Generated Files
#include "napa_plugins_config.hpp"
#include "napa_plugins_core_ssl.hpp"
#pragma endregion Generated Files
#pragma region
#include "rapidjson/document.h"
#pragma endregion
#include <future>
#include <grpcpp/impl/codegen/service_type.h>
#include <grpcpp/alarm.h>
#include "protos/PingPong.grpc.pb.h"
#include "protos/PingPong.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::ServerCompletionQueue;
using grpc::ServerAsyncResponseWriter;
class PingPongServiceImpl final
: public ping_pong::PingPong::Service {
public:
~PingPongServiceImpl() {
std::cout << "Destructor of PingPongServiceImpl" << std::endl;
}
private:
grpc::Status PingUnary(
grpc::ServerContext* context,
const ping_pong::PingPongRequest* request,
ping_pong::PingPongReply* reply) override
{
std::cout << "PingPong" << std::endl;
std::cout
<< "PingPong - input_msg = "
<< request->input_msg()
<< std::endl;
if (request->input_msg() == "hello") {
reply->set_output_msg("world");
}
else {
reply->set_output_msg("I can't pong unless you ping me 'hello'!");
}
std::cout << "Replying with " << reply->output_msg() << std::endl;
return grpc::Status::OK;
}
};
typedef void ScanProgressCallbackType(_Inout_opt_ void* context, _In_z_ const std::string location);
typedef void ScanStartedCallbackType(_Inout_opt_ void* context);
typedef void ScanFinishedCallbackType(_Inout_opt_ void* context, _In_ bool scanFailed);
class DummyScanner
{
public:
DummyScanner()
:scanContext_(NULL),
scanCount_(0),
scanProgressCallback_(NULL),
scanStartedCallback_(NULL),
scanFinishedCallback_(NULL)
{
}
~DummyScanner()
{
if (dummyScanThread_.joinable())
{
dummyScanThread_.join();
}
}
void RegisterScanProgressCallbacks(
ScanProgressCallbackType* onScanProgress,
ScanStartedCallbackType* onScanStarted,
ScanFinishedCallbackType* onScanFinished,
_In_opt_ void* context)
{
scanProgressCallback_ = onScanProgress;
scanStartedCallback_ = onScanStarted;
scanFinishedCallback_ = onScanFinished;
scanContext_ = context;
}
bool StartScan()
{
if (scanCount_ > 0)
{
return false;
}
if (dummyScanThread_.joinable())
{
dummyScanThread_.join();
}
dummyScanThread_ = std::thread(&DummyScanner::dummyScanAndReport, this);
return true;
}
private:
void dummyScanAndReport()
{
while (true)
{
if (scanCount_ == 100)
{
if (scanFinishedCallback_ != NULL)
{
scanFinishedCallback_(scanContext_, false);
}
break;
}
if (scanCount_ == 1)
{
if (scanStartedCallback_ != NULL)
{
scanStartedCallback_(scanContext_);
}
}
if (scanCount_ < 100)
{
if (scanProgressCallback_ != NULL)
{
std::stringstream ss;
ss << scanCount_;
scanProgressCallback_(scanContext_, ss.str());
}
}
scanCount_++;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
scanCount_ = 0;
}
private:
void* scanContext_;
int scanCount_;
ScanProgressCallbackType* scanProgressCallback_;
ScanStartedCallbackType* scanStartedCallback_;
ScanFinishedCallbackType* scanFinishedCallback_;
std::thread dummyScanThread_;
};
enum CallStatus
{
CREATE,
PROCESS,
FINISH,
PUSH_TO_BACK
};
// interface for service to handle the async RPC
// every RPC will need to implement this interface, so that GRPC server can
// call RPC without knowing its type.
class IAsyncRpcDataAdapter
{
public:
virtual void Proceed() = 0;
virtual std::string Name() = 0;
virtual ~IAsyncRpcDataAdapter() = default;
protected:
IAsyncRpcDataAdapter(ping_pong::PingPong::AsyncService* service, ServerCompletionQueue* cq)
: pingPongAsync_(service)
, cq_(cq)
, status_(CREATE)
{
}
ping_pong::PingPong::AsyncService* pingPongAsync_;
ServerCompletionQueue* cq_;
ServerContext ctx_;
CallStatus status_; // The current serving state.
grpc::Alarm alarm_;
};
class StartScanCallData : public IAsyncRpcDataAdapter
{
public:
StartScanCallData(ping_pong::PingPong::AsyncService* service, ServerCompletionQueue* cq)
{
Proceed();
}
void Proceed() override
{
std::cout << __FUNCTION__ << std::endl;
if (status_ == CREATE)
{
status_ = PROCESS;
std::cout << __FUNCTION__ << ":- " << "CREATE this - " << this << std::endl;
pingPongAsync_->RequestStartScan(&ctx_, &request_, &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
}
else if (status_ == PROCESS)
{
std::cout << __FUNCTION__ << ":- " << "PROCESS this - " << this << std::endl;
// we need to keep an instance of api call data in the queue so that it keeps waiting in HandleRpcs.
new StartScanCallData(pingPongAsync_, cq_);
const Status status = StartScan(&ctx_, &request_, &reply_);
if (status.ok())
{
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
}
else
{
responder_.FinishWithError(status, this);
}
}
else
{
GPR_ASSERT(status_ == FINISH);
std::cout << __FUNCTION__ << ":- " << "FINISH this - " << this << std::endl;
delete this;
}
}
DummyScanner scanner_;
std::string Name() override
{
return "StartScanCallData";
}
private:
grpc::Status StartScan(grpc::ServerContext* context, const ping_pong::ScanRequest* request, ping_pong::ScanReply* reply)
{
std::cout << __FUNCTION__ << ":- " << "\n\n----------------------------------------" << std::this_thread::get_id() << std::endl;
// start dummy thread callback scan
scanner_.StartScan();
reply->set_scanalreadyinprogress(true);
return Status::OK;
}
ping_pong::ScanRequest request_;
ping_pong::ScanReply reply_;
ServerAsyncResponseWriter<ping_pong::ScanReply> responder_;
};
class RegisterScanProgressCallData : public IAsyncRpcDataAdapter
{
public:
RegisterScanProgressCallData(ping_pong::PingPong::AsyncService* service, ServerCompletionQueue* cq)
{
Proceed();
}
void Proceed() override
{
std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << std::endl;
if (status_ == CREATE)
{
status_ = PROCESS;
std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " CREATE this - " << this << std::endl;
pingPongAsync_->RequestRegisterScanProgress(&ctx_, &request_, &responder_, cq_, cq_, static_cast<IAsyncRpcDataAdapter*>(this));
}
else if (status_ == PROCESS)
{
std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " PROCESS this - " << this << std::endl;
if (!callBackTriggered_)
{
// we need to keep an instance of api call data in the queue so that it keeps waiting in HandleRpcs.
new RegisterScanProgressCallData(pingPongAsync_, cq_);
const Status status = RegisterScanProgress();
if (!status.ok())
{
status_ = FINISH;
responder_.Finish(Status::OK, this);
return;
}
else
{
scanStatus_ = ScanStatus::NOT_STARTED;
}
callBackTriggered_ = true;
}
else
{
waitForScanEvent();
}
std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " Calling ProgressReport" << std::endl;
ProgressReport();
}
else if (status_ == PUSH_TO_BACK)
{
status_ = PROCESS;
std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " ********************************* Setting alarm" << std::endl;
alarm_.Set(cq_, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}
else
{
GPR_ASSERT(status_ == FINISH);
std::cout << __FUNCTION__ << ":- " << " thread - " << std::this_thread::get_id() << " FINISH this - " << this << std::endl;
delete this;
}
}
std::string Name() override
{
return "RegisterScanProgressCallData****";
}
private:
grpc::Status RegisterScanProgress()
{
std::cout << __FUNCTION__ << ":- " << "---------------------------------------- thread - " << std::this_thread::get_id() << std::endl;
scanner_.RegisterScanProgressCallbacks(RegisterScanProgressCallData::OnScanProgress,
RegisterScanProgressCallData::OnScanStarted,
RegisterScanProgressCallData::OnScanFinished,
this);
return Status::OK;
}
grpc::Status UnRegisterScanProgress()
{
return Status::OK;
}
void UpdateReply()
{
reply_.set_status(scanStatus_);
reply_.set_location(scanLocation_);
}
void ProgressReport()
{
UpdateReply();
std::cout << __FUNCTION__ << ":- " << "reporting ScanStatus - " << scanStatus_ << " thread - " << std::this_thread::get_id() << std::endl;
if ((scanStatus_ == ScanStatus::FINISHED_SUCCESS)
|| (scanStatus_ == ScanStatus::FINISHED_FAILED))
{
status_ = FINISH;
std::cout << __FUNCTION__ << ":- " << "writing to responder_ this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
responder_.Finish(Status::OK, this);
}
else
{
status_ = PUSH_TO_BACK;
std::cout << __FUNCTION__ << ":- " << "writing to responder_ this- " << this << " thread -" << std::this_thread::get_id() << std::endl;
responder_.Write(std::move(reply_), this);
}
}
static void OnScanProgress(void* context, const std::string location)
{
try
{
auto _this = reinterpret_cast<RegisterScanProgressCallData*>(context);
std::cout << __FUNCTION__ << ":- " << "thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanStatus_ = ScanStatus::IN_PROGRESS;
_this->scanLocation_.assign(location);
std::cout << __FUNCTION__ << ":- " << "triggering progress event thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanPromise_->set_value(ScanStatus::IN_PROGRESS);
}
catch (...)
{
// Avoid exception being thrown through DLL boundary.
}
}
static void OnScanStarted(void* context)
{
try
{
auto _this = reinterpret_cast<RegisterScanProgressCallData*>(context);
std::cout << __FUNCTION__ << ":- " << "thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanStatus_ = ScanStatus::STARTED;
std::cout << __FUNCTION__ << ":- " << "triggering started event thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanPromise_->set_value(ScanStatus::STARTED);
}
catch (...)
{
// Avoid exception being thrown through DLL boundary.
}
}
static void OnScanFinished(void* context, bool scanFailed)
{
try
{
auto _this = reinterpret_cast<RegisterScanProgressCallData*>(context);
if ((_this->scanStatus_ != ScanStatus::FINISHED_FAILED) && (_this->scanStatus_ != ScanStatus::FINISHED_SUCCESS))
{
std::cout << __FUNCTION__ << ":- " << "thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanStatus_ = (scanFailed == true) ? ScanStatus::FINISHED_FAILED : ScanStatus::FINISHED_SUCCESS;
std::cout << __FUNCTION__ << ":- " << "triggering finished event thread - " << std::this_thread::get_id() << " this " << _this << std::endl;
_this->scanPromise_->set_value(_this->scanStatus_);
}
}
catch (...)
{
// Avoid exception being thrown through DLL boundary.
}
}
void waitForScanEvent()
{
if (scanPromise_ != nullptr)
{
scanPromise_.release();
}
scanPromise_ = std::make_unique<std::promise<ScanStatus>>();
auto scanFuture = scanPromise_->get_future();
std::cout << __FUNCTION__ << ":- " << "waiting for started event with scanStatus_ - " << scanStatus_ << std::endl;
auto eventData = scanFuture.get();
scanStatus_ = eventData;
std::cout << __FUNCTION__ << ":- " << "got started event now scanStatus_ - " << scanStatus_ << std::endl;
}
ping_pong::ScanProgressRequest request_;
ping_pong::ScanProgressReply reply_;
ServerAsyncWriter<ping_pong::ScanProgressReply> responder_;
ping_pong::ScanStatus scanStatus_;
std::string scanLocation_;
bool callBackTriggered_ = false;
std::unique_ptr<std::promise<ping_pong::ScanStatus>> scanPromise_;
};
class ServerImpl
{
public:
~ServerImpl()
{
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
void Run()
{
rapidjson::Document napaPluginConfig;
napaPluginConfig.Parse(napa_plugin_config_json.c_str());
int port(napaPluginConfig["port"].GetInt());
std::string hostname(napaPluginConfig["hostname"].GetString());
std::string server_address(hostname + ":" + std::to_string(port));
// Check for Server Credentials
// Comes as a string, we want a boolean.
auto strGrpcUseInsecureCreds = getenv("NAPA_GRPC_USE_INSECURE_CREDS");
// By default we will *not* use SSL for HTTP2.
// TODO(Keith): Once Envoy understands SSL, revert back to false.
bool bGrpcUseInsecureCreds = true;
if (strGrpcUseInsecureCreds != NULL)
{
std::istringstream(strGrpcUseInsecureCreds) >> std::boolalpha >> bGrpcUseInsecureCreds;
}
std::cout
<< "Environment variable NAPA_GRPC_USE_INSECURE_CREDS = "
<< bGrpcUseInsecureCreds
<< std::endl;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
grpc::ServerBuilder builder;
std::shared_ptr<grpc::ServerCredentials> serverCredentials;
if (bGrpcUseInsecureCreds)
{
// Listen on the given address without any authentication mechanism.
serverCredentials = grpc::InsecureServerCredentials();
std::cout
<< "NOTE: We are using gRPC insecure server credentials, without SSL."
<< "This means you are not using HTTP2"
<< std::endl;
}
else
{
grpc::SslServerCredentialsOptions sslOptions;
sslOptions.pem_root_certs = ssl_credentials_ca_cert.c_str();
sslOptions.pem_key_cert_pairs.push_back({ ssl_credentials_server_key.c_str(),
ssl_credentials_server_cert.c_str() });
serverCredentials = grpc::SslServerCredentials(sslOptions);
std::cout
<< "NOTE: We are using gRPC secure server credentials, with SSL."
<< "This means you are using HTTP2"
<< std::endl;
}
builder.AddListeningPort(server_address, serverCredentials);
builder.RegisterService(&pingPongAsync_);
cq_ = builder.AddCompletionQueue();
cqProgress_ = builder.AddCompletionQueue();
// Use Keep-alive to stop initial slow calls
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIME_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS, 10000);
builder.AddChannelArgument(GRPC_ARG_HTTP2_BDP_PROBE, 1);
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
// Finally assemble the server.
server_ = (builder.BuildAndStart());
std::cout
<< "Server listening on "
<< server_address
<< std::endl;
// kick start the queues for async RPCs
HandleStartScan();
HandleRegisterScanProgress();
progressOtherRpcs_ = std::thread(&ServerImpl::EventLoop, this, cq_.get());
progressReportThread_ = std::thread(&ServerImpl::EventLoop, this, cqProgress_.get());
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server_->Wait();
}
private:
void HandleStartScan()
{
new StartScanCallData(&pingPongAsync_, cq_.get());
}
void HandleRegisterScanProgress()
{
new RegisterScanProgressCallData(&pingPongAsync_, cqProgress_.get());
}
// TODO - we'll need to have more than one event loops. running in thread for the RPC
void EventLoop(ServerCompletionQueue* event_queue)
{
void* tag; // uniquely identifies a request.
bool ok;
while (event_queue->Next(&tag, &ok))
{
IAsyncRpcDataAdapter* adapter = static_cast<IAsyncRpcDataAdapter*>(tag);
if (ok)
{
std::cout << __FUNCTION__ << ":- EventLoop event for " << " thread - " << std::this_thread::get_id() << " queue " << event_queue << " tag " << adapter->Name() << std::endl;
adapter->Proceed();
}
else
{
std::cout << __FUNCTION__ << ":- Need to see why this failed for " << " thread - " << std::this_thread::get_id() << " queue " << event_queue << " tag " << adapter->Name() << std::endl;
adapter->Proceed();
continue;
}
}
}
std::unique_ptr<Server> server_;
std::unique_ptr<ServerCompletionQueue> cq_;
std::unique_ptr<ServerCompletionQueue> cqProgress_;
ping_pong::PingPong::AsyncService pingPongAsync_;
std::thread progressReportThread_;
std::thread progressOtherRpcs_;
};
int main(int argc, char** argv)
{
ServerImpl server;
server.Run();
return 0;
}
// -------------------------------------------------------------------------
// proto file
syntax = "proto3";
package ping_pong;
service PingPong {
rpc PingUnary (PingPongRequest) returns (PingPongReply) {}
rpc StartScan (ScanRequest) returns (ScanReply) {}
rpc RegisterScanProgress (ScanProgressRequest) returns (stream ScanProgressReply) {}
}
message PingPongRequest {
string input_msg = 1;
}
message PingPongReply {
string output_msg = 1;
}
message ScanRequest{
string scanType = 1;
}
message ScanReply{
bool scanAlreadyInProgress = 1;
}
message ScanProgressRequest {
}
enum ScanStatus
{
NOT_STARTED = 0;
STARTED = 1;
IN_PROGRESS = 2;
FINISHED_SUCCESS = 3;
FINISHED_FAILED = 4;
}
message ScanProgressReply {
ScanStatus status = 1;