INTERESTING !! - Seeing better performance in gRPC sync approach (callback APIs) than in async one

94 views
Skip to first unread message

Pedro Alfonso

unread,
Sep 1, 2023, 6:11:38 PM9/1/23
to grpc.io
Hi community.

We are using gRPC C++ Async API approach on the server and Swift on the client.
Our goal is to read and stream a file in chunks to the Swift clients.

This is our server code (code is really simple):
-------------------------------------------------------------------------

#include "Streamer_grpc.h"

namespace Illuscio {

Streamer_GRPC::~Streamer_GRPC() {
    _server->Shutdown();
    _queue->Shutdown();
};

void Streamer_GRPC::Run( uint16_t port ) {
    std::string server_address = "0.0.0.0:" + std::to_string( port );

    ServerBuilder builder;

    builder.AddListeningPort( server_address, grpc::InsecureServerCredentials() );
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService( &_service );
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    _queue = builder.AddCompletionQueue();
    // Finally assemble the server.
    _server = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRPCs();
}


Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* service, ServerCompletionQueue* queue )
    : CallData { service, queue }, _responder { &_context } {
    _tag.id = MessageID::TMP_FILE;
    _tag.data = this;
    Proceed();
}

void Streamer_GRPC::TMPFileData::Proceed() {
    switch ( _status ) {
    case CallStatus::CREATE: {
        _status = CallStatus::OPEN_FILE;
        _service->RequestStreamFile( &_context, &_clientFileReq, &_responder, _cq, _cq, (void*) this );
        break;
    }

    case CallStatus::OPEN_FILE: {
        myfile.open(_clientFileReq.file_path(), std::ios::binary);
           
        if (myfile.is_open()) {
            _status = CallStatus::PROCESS;
            std::cout << "File was successfuly open..." << std::endl;
            static_cast<CallData*>(this)->Proceed();
        } else {
            // Handles file opening failure
            std::cout << "File open operation failed..." << std::endl;
            _responder.Finish(Status::CANCELLED, nullptr);
            return;
        }
   
        break;
    }

    case CallStatus::PROCESS: {
        new TMPFileData { _service, _cq };
   
        if (!myfile.eof()) {
            const int read_size = static_cast<int>(myfile.gcount());
            _fileChunk.set_chunk_data(buffer, read_size);
            _responder .Write(_fileChunk, this);
        } else {
            std::cout << "EOF reached..." << std::endl;
            _status = CallStatus::FINISH;
            _responder.Finish(Status::OK, this);
        }

        break;
    }  
   
    default: {
        delete this;
    }
    }
}

void Streamer_GRPC::HandleRPCs() {
    new TMPFileData { &_service, _queue.get() };
    void* tag;
    bool ok;

    while ( _queue->Next(&tag, &ok) ) {
       // GPR_ASSERT(_queue->Next(&tag, &ok));
       // GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}
};

This is our proto file:
--------------------------------
syntax = "proto3";
package Illuscio;

service PROTO_Streamer {
    rpc StreamFile(TMP_FileRequest) returns (stream TMP_FileChunk);
}

message TMP_FileRequest {
    string file_path = 1;
}

message TMP_FileChunk {
    bytes chunk_data = 1;
}

And lastly, this is our client code snippet:
-----------------------------------------------------------------

Below method calls the actual gRPC client method (`StreamFile` rpc). It actually just call next and wait for the server to write it and then the data is appended until there's no more data to append.

func getContentData(for sourceURL: URL) async throws -> Data {
        do {
            try connection.connect()
        } catch { throw MCError.serverConnectionError }

        // Make the gRPC call to fetch the file's data asynchronously
       
        Task {
            var receivedData = Data()
            var fileChunkIterator = (connection.client?.getAsset(with: sourceURL) as! GRPCAsyncResponseStream<Illuscio_TMP_FileChunk>).makeAsyncIterator()
            do {
                var fileChunk = try await fileChunkIterator.next()
                // Concurrently fetch and append chunks to receivedData
                while fileChunk != nil {
                    receivedData.append(fileChunk!.chunkData)
                    fileChunk = try await fileChunkIterator.next()
                }
               
                try? connection.disconnect()
            } catch (let error) {
                throw MCError.fileWriteToLocalFailed
            }
        }
            // Once the call completes, disconnect from the server
            return Data()
    }

Questions:
=========
1.- Unless we are missing something, we do not see the same performance as with the gRPC callbacks implementation. We know next is marked as await, so our understanding is it won't be asking for the file's next chunk until it is written on the `callbackqueue` server side, then we will be unblocked on client side and we can ask for the next chunk(iterator next method) and so on (since there's only 1 write task permitted at a time).....
2.- With callbacks, we were able to have a loop on the server iterating over a file and sending one chunk after the other until we reach the file's end. Because of this approach we were able to receive the chunks really really fast on the client side (which is not happening with the gRPC async approach).
3.- And we are obligated to use the Async approach since we will be serving several clients at the same time.
4.- Are we missing something?

Thanks in advance.
Reply all
Reply to author
Forward
0 new messages