Hi community.
We are using gRPC C++ Async API approach on the server and Swift on the client.
Our goal is to read and stream a file in chunks to the Swift clients.
#include "Streamer_grpc.h"
namespace Illuscio {
Streamer_GRPC::~Streamer_GRPC() {
_server->Shutdown();
_queue->Shutdown();
};
void Streamer_GRPC::Run( uint16_t port ) {
std::string server_address = "
0.0.0.0:" + std::to_string( port );
ServerBuilder builder;
builder.AddListeningPort( server_address, grpc::InsecureServerCredentials() );
// Register "service_" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *asynchronous* service.
builder.RegisterService( &_service );
// Get hold of the completion queue used for the asynchronous communication
// with the gRPC runtime.
_queue = builder.AddCompletionQueue();
// Finally assemble the server.
_server = builder.BuildAndStart();
std::cout << "Server listening on " << server_address << std::endl;
// Proceed to the server's main loop.
HandleRPCs();
}
Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* service, ServerCompletionQueue* queue )
: CallData { service, queue }, _responder { &_context } {
_
tag.id = MessageID::TMP_FILE;
_tag.data = this;
Proceed();
}
void Streamer_GRPC::TMPFileData::Proceed() {
switch ( _status ) {
case CallStatus::CREATE: {
_status = CallStatus::OPEN_FILE;
_service->RequestStreamFile( &_context, &_clientFileReq, &_responder, _cq, _cq, (void*) this );
break;
}
case CallStatus::OPEN_FILE: {
myfile.open(_clientFileReq.file_path(), std::ios::binary);
if (myfile.is_open()) {
_status = CallStatus::PROCESS;
std::cout << "File was successfuly open..." << std::endl;
static_cast<CallData*>(this)->Proceed();
} else {
// Handles file opening failure
std::cout << "File open operation failed..." << std::endl;
_responder.Finish(Status::CANCELLED, nullptr);
return;
}
break;
}
case CallStatus::PROCESS: {
new TMPFileData { _service, _cq };
if (!myfile.eof()) {
const int read_size = static_cast<int>(myfile.gcount());
_fileChunk.set_chunk_data(buffer, read_size);
_responder .Write(_fileChunk, this);
} else {
std::cout << "EOF reached..." << std::endl;
_status = CallStatus::FINISH;
_responder.Finish(Status::OK, this);
}
break;
}
default: {
delete this;
}
}
}
void Streamer_GRPC::HandleRPCs() {
new TMPFileData { &_service, _queue.get() };
void* tag;
bool ok;
while ( _queue->Next(&tag, &ok) ) {
// GPR_ASSERT(_queue->Next(&tag, &ok));
// GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
};
Below method calls the actual gRPC client method (`StreamFile` rpc). It actually just call next and wait for the server to write it and then the data is appended until there's no more data to append.
1.- Unless we are missing something, we do not see the same performance as with the gRPC callbacks implementation. We know next is marked as await, so our understanding is it won't be asking for the file's next chunk until it is written on the `callbackqueue` server side, then we will be unblocked on client side and we can ask for the next chunk(iterator next method) and so on (since there's only 1 write task permitted at a time).....
2.- With callbacks, we were able to have a loop on the server iterating over a file and sending one chunk after the other until we reach the file's end. Because of this approach we were able to receive the chunks really really fast on the client side (which is not happening with the gRPC async approach).
3.- And we are obligated to use the Async approach since we will be serving several clients at the same time.
Thanks in advance.