C++ gRPC server side - Stream file in chunks to client

787 views
Skip to first unread message

Pedro Alfonso

unread,
Aug 23, 2023, 6:55:39 PM8/23/23
to grpc.io
I'm getting below error ("Bus error"), when running the c++ server:

dev-telemetry@telemetry build % ./file_streaming_server                        

Server listening on 0.0.0.0:8070

Buffer contents: 67 76 68 46 73 76 76 85 83 67 73 79 2 0 0 0 104 69 0 0 27 0 0 0 -35 -65 -93 2 0 0 0 0 -17 -63 88 71 0 0 0 0 16 8 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 100 -68 75 12 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 78 -65 109 76 53 0 0 -96 -123 19 0 0 0 0 0 4 -97 5 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 -73 -31 -67 68 97 126 62 -120 67 -6 62 88 -80 -63 62 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 2 0 0 0 1 0 0 0 0 78 0 0 -40 83 0 0 -92 36 25 0 0 0 0 0 -56 -41 8 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 -73 -31 -67 -128 96 -120 59 -48 104 122 62 112 66 9 62 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 3 0 0 0 2 0 0 0 0 78 0 0 25 35 0 0 108 -4 33 0 0 0 0 0 -93 -77 3 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 -96 -73 -31 -67 56 -104 -15 -67 97 -77 -6 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 4 0 0 0 3 0 0 0 0 78 0 0 106 18 0 0 15 -80 37 0 0 0 0 0 46 -15 1 0 0 0 0 0 60 -37 117 -66 -64 32 -107 57 0 81 73 -67 -84 83 55 -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 5 0 0 0 4 0 0 0 0 78 0 0 21 27 0 0 61 -95 39 0 0 0 0 0 55 -37 2 0 0 0 0 0 60 -37 117 -66 -61 114 -4 60 -64 -125 -104 -68 116 -105 86 -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 6 0 0 0 5 0 0 0 1 78 0 0 -55 0 0 0 116 124 42 0 0 0 0 0 51 21 0 0 0 0 0 0 60 -37 117 -66 -15 -64 60 61 -64 -125 -104 -68 88 57 102 -66 -127 72 123 61 0 -91 91 -69 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 7 0 0 0 5 0 0 0 0 78 0 0 83 15 0 0 -89 -111 42 0 0 0 0 0 -63 -99 1 0 0 0 0 0 60 -37 117 -66 -15 -64 60 61 0 -91 91 -69 88 57 102 -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 8 0 0 0 7 0 0 0 1 78 0 0 -91 2 0 0 104 47 44 0 0 0 0 0 103 71 0 0 0 0 0 0 74 10 110 -66 -15 -64 60 61 -64 75 -116 59 88 57 102 -66 -71 4 92 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 9 0 0 0 7 0 0 0 1 78 0 0 -103 9 0 0 -49 118 44 0 0 0 0 0 35 3 1 0 0 0 0 0 74 10 110 -66 -71 4 92 61 0 -91 91 -69 88 57 102 -66 -127 72 123 61 -64 75 -116 59 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 10 0 0 0 7 0 0 0 1 78 0 0 59 16 0 0 -14 121 45 0 0 0 0 0 57 -74 1 0 0 0 0 0 74 10 110 -66 -71 4 92 61 -64 75 -116 59 88 57 102 -66 -127 72 123 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 11 0 0 0 5 0 0 0 0 78 0 0 52 26 0 0 43 48 47 0 0 0 0 0 124 -61 2 0 0 0 0 0 88 57 102 -66 -61 114 -4 60 0 -91 91 -69 116 -105 86 -66 -15 -64 60 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 12 0 0 0 11 0 0 0 1 78 0 0 6 0 0 0 -89 -13 49 0 0 0 0 0 -94 0 0 0 0 0 0 0 88 57 102 -66 -61 114 -4 60 -64 75 -116 59 102 104 94 -66 41 125 29 61 0 53 67 60 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 13 0 0 0 11 0 0 0 1 78 0 0 30 0 0 0 73 -12 49 0 0 0 0 0 42 3 0 0 0 0 0 0 88 57 102 -66 41 125 29 61 0 -91 91 -69 102 104 94 -66 -15 -64 60 61 -64 75 -116 59 0 0 0 0 0 0 0 0 

Buffer size: 1024 bytes

Read size: 1024 bytes

zsh: bus error  ./file_streaming_server

dev-telemetry@telemetry build % 

Above I'm printing the buffer's content and size I'm trying to stream to the Swift client.

And here's the server code:

#include <iostream>
#include <memory>
#include <string>
#include <fstream>
#include <dirent.h>
#include <algorithm>

#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include "file_streaming.grpc.pb.h"

using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerUnaryReactor;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerWriter;
using grpc::Status;

class FileStreamingImpl final : public Illuscio::FileStreaming::CallbackService {
 public:
   grpc::ServerWriteReactor<Illuscio::FileChunk>* GetFile(grpc::CallbackServerContext* context, const Illuscio::FileRequest* request) override {
     class FileSplitter : public grpc::ServerWriteReactor<Illuscio::FileChunk> {
     private:
             const int chunk_size = 1024;
             char buffer[1024];
             std::ifstream file;
       //
     public:
   FileSplitter(const std::string& file_path) : file(file_path, std::ios::binary) {
       try {
           if (!file) {
               Finish(grpc::Status(grpc::StatusCode::INTERNAL, "Error opening file"));
               return;
           }

           NextWrite();
       } catch (const std::exception& ex) {
           Finish(grpc::Status(grpc::StatusCode::INTERNAL, ex.what()));
       }
   }
   //
    void OnDone() override { delete this; }
    void OnWriteDone(bool /*ok*/) override {
      std::cout << "Another write..." << std::endl;
      NextWrite();
    }

          private:
            void NextWrite() {
             file.read(buffer, chunk_size);
             const int read_size = static_cast<int>(file.gcount());

             if (read_size > 0) {
                 Illuscio::FileChunk chunk;
                 chunk.set_chunk_data(buffer, read_size);
                 std::cout << "Buffer contents: ";
                 //
                 for (int i = 0; i < read_size; ++i) {
                   std::cout << static_cast<int>(buffer[i]) << " ";
                 }
                 std::cout << std::endl;
                 std::cout << "Buffer size: " << sizeof(buffer) << " bytes" << std::endl;
                 std::cout << "Read size: " << read_size << " bytes" << std::endl;
//
                 StartWrite(&chunk);
             } else {
                 // All data has been sent, finish the stream
                 Finish(grpc::Status::OK);
             }
         }
         };

         return new FileSplitter(request->file_path());
      }

.......

I have tried with several buffer sizes from 1024 bytes to 1MB, but still getting the "Bus error" message.

Any thoughts?


Thanks in advance.

Brad Town

unread,
Aug 30, 2023, 5:32:30 PM8/30/23
to Pedro Alfonso, grpc.io
I think the problem is that the Illuscio::FileChunk object is created on the stack, StartWrite is called with that chunk, and the object goes out of scope. (I can't be sure because I don't know how Illuscio::FileChunk is implemented, but I'm assuming it's writing the data to a buffer internal to Illuscio::FileChunk.) I believe the write operation is trying to access a buffer that's no longer available.

Brad Town

--
You received this message because you are subscribed to the Google Groups "grpc.io" group.
To unsubscribe from this group and stop receiving emails from it, send an email to grpc-io+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/grpc-io/5edbaedb-681d-44a3-ba1f-8e8d0c896b69n%40googlegroups.com.

Pedro Alfonso

unread,
Aug 31, 2023, 2:47:01 PM8/31/23
to grpc.io
Thanks Brad and yes it is correct, needed to instantiate it. Problem solved...

Pedro Alfonso

unread,
Sep 2, 2023, 9:15:05 AM9/2/23
to grpc.io
Hi Brad,

We are just learning how to use gRPC and we are observing some hard to understand behaviors, let me give you the entire picture:

We are using gRPC C++ Async API approach on the server and Swift on the client.
Our goal is to read and stream a file in chunks to the Swift clients.

This is our server code (code is really simple):
-------------------------------------------------------------------------

#include "Streamer_grpc.h"

namespace Illuscio {

Streamer_GRPC::~Streamer_GRPC() {
    _server->Shutdown();
    _queue->Shutdown();
};

void Streamer_GRPC::Run( uint16_t port ) {
    std::string server_address = "0.0.0.0:" + std::to_string( port );

    ServerBuilder builder;

    builder.AddListeningPort( server_address, grpc::InsecureServerCredentials() );
    // Register "service_" as the instance through which we'll communicate with
    // clients. In this case it corresponds to an *asynchronous* service.
    builder.RegisterService( &_service );
    // Get hold of the completion queue used for the asynchronous communication
    // with the gRPC runtime.
    _queue = builder.AddCompletionQueue();
    // Finally assemble the server.
    _server = builder.BuildAndStart();
    std::cout << "Server listening on " << server_address << std::endl;

    // Proceed to the server's main loop.
    HandleRPCs();
}


Streamer_GRPC::TMPFileData::TMPFileData( PROTO_Streamer::AsyncService* service, ServerCompletionQueue* queue )
    : CallData { service, queue }, _responder { &_context } {
    _tag.id = MessageID::TMP_FILE;
    _tag.data = this;
    Proceed();
}

void Streamer_GRPC::TMPFileData::Proceed() {
    switch ( _status ) {
    case CallStatus::CREATE: {
        _status = CallStatus::OPEN_FILE;
        _service->RequestStreamFile( &_context, &_clientFileReq, &_responder, _cq, _cq, (void*) this );
        break;
    }

    case CallStatus::OPEN_FILE: {
        myfile.open(_clientFileReq.file_path(), std::ios::binary);
           
        if (myfile.is_open()) {
            _status = CallStatus::PROCESS;
            std::cout << "File was successfuly open..." << std::endl;
            static_cast<CallData*>(this)->Proceed();
        } else {
            // Handles file opening failure
            std::cout << "File open operation failed..." << std::endl;
            _responder.Finish(Status::CANCELLED, nullptr);
            return;
        }
   
        break;
    }

    case CallStatus::PROCESS: {
        new TMPFileData { _service, _cq };
   
        if (!myfile.eof()) {
            const int read_size = static_cast<int>(myfile.gcount());
            _fileChunk.set_chunk_data(buffer, read_size);
            _responder .Write(_fileChunk, this);
        } else {
            std::cout << "EOF reached..." << std::endl;
            _status = CallStatus::FINISH;
            _responder.Finish(Status::OK, this);
        }

        break;
    }  
   
    default: {
        delete this;
    }
    }
}

void Streamer_GRPC::HandleRPCs() {
    new TMPFileData { &_service, _queue.get() };
    void* tag;
    bool ok;

    while ( _queue->Next(&tag, &ok) ) {
       // GPR_ASSERT(_queue->Next(&tag, &ok));
       // GPR_ASSERT(ok);
        static_cast<CallData*>(tag)->Proceed();
    }
}
};

This is our proto file:
--------------------------------
syntax = "proto3";
package Illuscio;

service PROTO_Streamer {
    rpc StreamFile(TMP_FileRequest) returns (stream TMP_FileChunk);
}

message TMP_FileRequest {
    string file_path = 1;
}

message TMP_FileChunk {
    bytes chunk_data = 1;
}

And lastly, this is our client code snippet:
-----------------------------------------------------------------

Below method calls the actual gRPC client method (`StreamFile` rpc). It actually just call next and wait for the server to write it and then the data is appended until there's no more data to append.

func getContentData(for sourceURL: URL) async throws -> Data {
        do {
            try connection.connect()
        } catch { throw MCError.serverConnectionError }

        // Make the gRPC call to fetch the file's data asynchronously
       
        Task {
            var receivedData = Data()
            var fileChunkIterator = (connection.client?.getAsset(with: sourceURL) as! GRPCAsyncResponseStream<Illuscio_TMP_FileChunk>).makeAsyncIterator()
            do {
                var fileChunk = try await fileChunkIterator.next()
                // Concurrently fetch and append chunks to receivedData
                while fileChunk != nil {
                    receivedData.append(fileChunk!.chunkData)
                    fileChunk = try await fileChunkIterator.next()
                }
               
                try? connection.disconnect()
            } catch (let error) {
                throw MCError.fileWriteToLocalFailed
            }
        }
            // Once the call completes, disconnect from the server
            return Data()
    }


Recently we have being trying both approaches (async and callbacks), since we are streaming chunks of a file to the Swift clients (our server is C++).
Our observations are:
1.- The bytes rate received on the client when using callbacks are between 25-40 Mb/s.
2.- The bytes rate received on the client  when using async APIs, are between 0-97 Kb/s (which is not good).
We honestly were expecting that the async APIs would behave even better which doesn't seem to be the case.

Any idea what we are missing?
Reply all
Reply to author
Forward
0 new messages