Async C++ server w/multi-threading approach

156 views
Skip to first unread message

Pedro Alfonso

unread,
Oct 19, 2023, 11:03:42 AM10/19/23
to grpc.io
Hello,

First let me explain what we have in our C++ gRPC Async server codebase:

- We have 2 unary based response RPCs.
- And we have 2 stream based response RPCs which will cover over 95% of the client's API consumption, meaning they are really important to our streaming based implementation.

From the 2 stream based response RPCs, below one is the most critical to us:

// Inner class StreamAssetNodes
class StreamAssetNodes : public RequestBase {
public:
StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( owner ), ownerClass( owner ) {
owner_.grpc().service_.RequestStreamAssetNodes(
&context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation::CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) {
LOG_DEBUG << "\n" + me( *this ) << "\n\n*****************************************************************\n"
<< "- Processing a new connect from " << context_.peer()
<< "\n\n*****************************************************************\n"
<< endl;
cout << "\n" + me( *this ) << "\n*****************************************************************\n"
<< "- Processing a new connect from " << context_.peer() << "\n*****************************************************************\n"
<< endl;

if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The CONNECT-operation failed." << endl;
cout << "The CONNECT-operation failed." << endl;
return;
}

// Creates a new instance so the service can handle requests from a new client
owner_.createNew<StreamAssetNodes>( owner );
// Reads request's parameters
readNodeIds();
} ) );
}

private:
// Objects and variables
AsyncAssetStreamerManager& ownerClass;
::Illuscio::AssetNodeIds request_;
::Illuscio::AssetNodeComponent reply_;
::grpc::ServerContext context_;
::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )> stream_ { &context_ };

vector<string> nodeids_vector;
// Contains mapping for all the nodes of a set of assets
json assetsNodeMapping;
// Contains mapping for all the nodes of a particular asset
json assetNodeMapping;
ifstream nodeFile;
// Handle for messages coming in
Handle in_handle_ { *this };
// Handle for messages going out
Handle out_handle_ { *this };

int fileNumber = 0;
const int chunk_size = 16 * 1024;
char buffer[16 * 1024];

// Methods

void readNodeIds() {
// Reads RPC request parameters
stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( bool ok, Handle::Operation op ) {
if ( !ok ) [[unlikely]] { return; }

// Assigns the request to the nodeids vector
nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().end() );
request_.clear_nodeids();

if ( !nodeids_vector.empty() ) {
ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid()];
if ( ownerClass.assetNodeMapping.empty() ) {
stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's UUID not found in server..." ),
in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed." << endl;
cout << "The FINISH request-operation failed." << endl;
}

LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << endl;
cout << "Asset's UUID not found in server: " << request_.uuid() << endl;
} ) );
return;
}

writeNodeFile( nodeids_vector.front() );
} else {
stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node ids empty. Without node ids node streaming can't start..." ),
in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed.";
cout << "The FINISH request-operation failed.";
}

LOG_DEBUG << "Asset' node ids coming empty on the request. Without node ids node streaming can't start..." << endl;
cout << "Asset' node ids coming empty on the request. Without node ids node streaming can't start..." << endl;
} ) );
}
} ) );
}

void writeNodeFile( const string& nodeId ) {
// Opens the file which contains the requested node
nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::binary );

if ( !nodeFile.is_open() ) {
LOG_DEBUG << "Asset's node file open operation failed for node:" << nodeId << endl;
cout << "Asset's node file open operation failed for node:" << nodeId << endl;
}

splitFileAndWriteChunks();
}

void splitFileAndWriteChunks() {
setReplyWithBuffer();

stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( bool ok, Handle::Operation op ) {
if ( !nodeFile.eof() ) {
splitFileAndWriteChunks();
} else if ( !nodeids_vector.empty() ) {
nodeFile.close();
nodeids_vector.erase( nodeids_vector.begin() );

if ( !nodeids_vector.empty() ) {
writeNodeFile( nodeids_vector.front() );
} else {
finishIfDone();
}
}
} ) );
}

void setReplyWithBuffer() {
// Fills read buffer
nodeFile.read( buffer, chunk_size );

// Prepare reply and start writing
reply_.Clear();
reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) );
}

// We wait until all incoming messages are received and all outgoing messages are sent
// before we send the finish message.
void finishIfDone() {
stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed." << endl;
cout << "The FINISH request-operation failed." << endl;
}
} ) );
}
};

So the idea with above code is that the request is basically an array of strings `ids` (ex. "1", "2", "3", ... btw it is defined as a stream in protobuf) and each of those ids are pointing to a small file which is stored on the server. Now, once the request is read on the rpc, it will take the first Id, will open the file it points to and then start to write the file in chunks to the client as a stream type response and when it finishes, then it takes the second Id from the array and does the same thing again and again until there are no more Ids left in the request's array.

From the client perspective the behavior is, a singe client should call above RPC passing this ids array having a size of probably 500 elements (different ids) and also the number of calls to this RPC should be like 1000 calls per seconds.  

We are using a shared completion queue for all RPCs and we do not have a multithreading approach.

With all above background, we would like to know if we can implement a more efficient approach for above RPC method probably based in multi-threading, that's our ultimate goal.

Questions:

1.- Can the GRPC team tell us how to approach this problem in detail, also in a away we can use a multi-threading strategy?

2.- How to use completion queues more efficiently along with these new threads?

The reason why we are asking these questions, is because we feel we are not leveraging the real power of gRPC for this specific use case.

Please let us know if you need more details from us.

Thanks in advance.

Pedro Alfonso

yas...@google.com

unread,
Oct 25, 2023, 2:17:23 PM10/25/23
to grpc.io
We have been recommending using the C++ callback API instead of the completion queue based API since it's easier to use. All performance optimizations that we are working are targeting the callback API.

Pedro Alfonso

unread,
Oct 25, 2023, 9:21:28 PM10/25/23
to grpc.io
Hi Yas,

First of all, thanks for coming back to us.
That's a really important comment and please correct us if we are wrong, the understanding is C++ gRPC team is working towards or maybe putting more effort in perfecting/optimizing the callback API approach? and btw, we are also agree, it's easier to use.

Kindly help us with these additional questions:

1.- By using the callback API approach, will we be able to serve different users concurrently the same way we do with our current implementation?
2.- Will we need to implement a threading logic like the one we have, or is not needed?

Thanks in advance.

Regards,

Pedro

yas...@google.com

unread,
Oct 26, 2023, 7:02:23 PM10/26/23
to grpc.io
> C++ gRPC team is working towards or maybe putting more effort in perfecting/optimizing the callback API approach?
Yes

> 1.- By using the callback API approach, will we be able to serve different users concurrently the same way we do with our current implementation?
Yes
> 2.- Will we need to implement a threading logic like the one we have, or is not needed?
Not needed with C++ callback API 

Reply all
Reply to author
Forward
0 new messages