From a source I am getting stream data which size will not be known before the final processing, but the minimum is 10 GB. I have to send this large amount of data using `gRPC`.
Need to mention here, this `large amount data` will be passed through the `gRPC` while the processing of the `streaming` is done. In this step, I have thought to store all the value in a `vector`.
##### Regarding sending large amount of data I have tried to get idea and found:
- [This](
https://stackoverflow.com/questions/68644134/how-to-split-long-messages-into-short-messages-in-grpc-c) where it is mentioned not to pass large data using `gRPC`. Here, mentioned to use any other message protocol where I have limitation to use something else rather than `gRPC`(at least till today).
- [From this post](
https://stackoverflow.com/questions/60538700/how-could-i-send-message-of-many-fields-with-c-grpc-stream) I have tried to know how `chunk message` can be sent but I am not sure is it related to my problem or not.
- [First post](
https://jbrandhorst.com/post/grpc-binary-blob-stream/) where I have found a blog to stream data using `go` language.
- [This one](
https://stackoverflow.com/a/54334419/10634362) the presentation using `python` language of [this post](
https://jbrandhorst.com/post/grpc-binary-blob-stream/). But it is also incomplete.
- [gRPC example](
https://github.com/grpc/grpc/tree/master/examples/cpp/route_guide) could be a good start bt cannot decode due to lack of C++ knowledge
## From there, a huge Update I have done in the question. But the main theme of the question is not changed
##### What I have done so far and some points about my project. The [github repo is available here](
https://github.com/atifkarim/gRPC_CPP_CMake/tree/streaming_chunk).
- A `Unary rpc` is present in the project
- I know that my new `Bi directional rpc` will take some time. I want that the `Unary rpc` will not wait for the completion of the `Bi directional rpc`. Right now I am thinking in a `synchronous way` where `Unary rpc` is waiting to pass it's `status` for the streaming one completion.
- I am avoiding the unnecessary lines in `C++ code`. But giving whole `proto` files
- `big_data.proto`
```sh
syntax = "proto3";
package demo_grpc;
message Large_Data {
repeated int32 large_data_collection = 1 [packed=true];
int32 data_chunk_number = 2;
}
```
- `addressbook.proto`
```sh
syntax = "proto3";
package demo_grpc;
import "myproto/big_data.proto";
message S_Response {
string name = 1;
string street = 2;
string zip = 3;
string city = 4;
string country = 5;
int32 double_init_val = 6;
}
message C_Request {
uint32 choose_area = 1;
string name = 2;
int32 init_val = 3;
}
service AddressBook {
rpc GetAddress(C_Request) returns (S_Response) {}
rpc Stream_Chunk_Service(stream Large_Data) returns (stream Large_Data) {}
}
```
- `client.cpp`
```cpp
#include <big_data.pb.h>
#include <addressbook.grpc.pb.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/create_channel.h>
#include <iostream>
#include <numeric>
using namespace std;
// This function prompts the user to set value for the required area
void Client_Request(demo_grpc::C_Request &request_)
{
// do processing for unary rpc. Intentionally avoided here
}
// According to Client Request this function display the value of protobuf message
void Server_Response(demo_grpc::C_Request &request_, const demo_grpc::S_Response &response_)
{
// do processing for unary rpc. Intentionally avoided here
}
// following function make large vector and then chunk to send via stream from client to server
void Stream_Data_Chunk_Request(demo_grpc::Large_Data &request_,
demo_grpc::Large_Data &response_,
uint64_t preferred_chunk_size_in_kibyte)
{
// A dummy vector which in real case will be the large data set's container
std::vector<int32_t> large_vector;
// irerate it now for 1024*10 times
for(int64_t i = 0; i < 1024 * 10; i++)
{
large_vector.push_back(1);
}
uint64_t preferred_chunk_size_in_kibyte_holds_integer_num = 0; // 1 chunk how many intger will contain that num will come here
// total chunk number will be updated here
uint32_t total_chunk = total_chunk_counter(large_vector.size(), preferred_chunk_size_in_kibyte, preferred_chunk_size_in_kibyte_holds_integer_num);
// A temp counter to trace the index of the large_vector
int32_t temp_count = 0;
// loop will start if the total num of chunk is greater than 0. After each iteration total_chunk will be decremented
while(total_chunk > 0)
{
for (int64_t i = temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i < preferred_chunk_size_in_kibyte_holds_integer_num + temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i++)
{
// the repeated field large_data_collection is taking value from the large_vector
request_.add_large_data_collection(large_vector[i]);
}
temp_count++;
total_chunk--;
std::string ip_address = "localhost:50051";
auto channel = grpc::CreateChannel(ip_address, grpc::InsecureChannelCredentials());
std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
grpc::ClientContext context;
std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data> > stream(stub->Stream_Chunk_Service(&context));
// While the size of each chunk is eached then this repeated field is cleared. I am not sure before this
// value can be transfered to server or not. But my assumption is saying that it should be done
request_.clear_large_data_collection();
}
}
int main(int argc, char* argv[])
{
std::string client_address = "localhost:50051";
std::cout << "Address of client: " << client_address << std::endl;
// The following part for the Unary RPC
demo_grpc::C_Request query;
demo_grpc::S_Response result;
Client_Request(query);
// This part for the streaming chunk data (Bi directional Stream RPC)
demo_grpc::Large_Data stream_chunk_request_;
demo_grpc::Large_Data stream_chunk_response_;
uint64_t preferred_chunk_size_in_kibyte = 64;
Stream_Data_Chunk_Request(stream_chunk_request_, stream_chunk_response_, preferred_chunk_size_in_kibyte);
// Call
auto channel = grpc::CreateChannel(client_address, grpc::InsecureChannelCredentials());
std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
grpc::ClientContext context;
grpc::Status status = stub->GetAddress(&context, query, &result);
// the following status is for unary rpc as far I have understood the structure
if (status.ok())
{
Server_Response(query, result);
}
else
{
std::cout << status.error_message() << std::endl;
}
return 0;
}
```
- `heper function total_chunk_counter`
```cpp
#include <cmath>
uint32_t total_chunk_counter(uint64_t num_of_container_content,
uint64_t preferred_chunk_size_in_kibyte,
uint64_t &preferred_chunk_size_in_kibyte_holds_integer_num)
{
uint64_t cotainer_size_in_kibyte = (32ULL * num_of_container_content) / 1024;
preferred_chunk_size_in_kibyte_holds_integer_num = (num_of_container_content * preferred_chunk_size_in_kibyte) / cotainer_size_in_kibyte;
float total_chunk = static_cast<float>(num_of_container_content) / preferred_chunk_size_in_kibyte_holds_integer_num;
return std::ceil(total_chunk);
}
```
- `server.cpp` which is totally incomplete
```cpp
#include <myproto/big_data.pb.h>
#include <myproto/addressbook.grpc.pb.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/server_builder.h>
#include <iostream>
class AddressBookService final : public demo_grpc::AddressBook::Service {
public:
virtual ::grpc::Status GetAddress(::grpc::ServerContext* context, const ::demo_grpc::C_Request* request, ::demo_grpc::S_Response* response)
{
switch (request->choose_area())
{
// do processing for unary rpc. Intentionally avoided here
std::cout << "Information of " << request->choose_area() << " is sent to Client" << std::endl;
return grpc::Status::OK;
}
// Bi-directional streaming chunk data
virtual ::grpc::Status Stream_Chunk_Service(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data>* stream)
{
// stream->Large_Data;
return grpc::Status::OK;
}
};
void RunServer()
{
std::cout << "grpc Version: " << grpc::Version() << std::endl;
std::string server_address = "localhost:50051";
std::cout << "Address of server: " << server_address << std::endl;
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
AddressBookService my_service;
builder.RegisterService(&my_service);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
server->Wait();
}
int main(int argc, char* argv[])
{
RunServer();
return 0;
}
```
#### In summary my desire
- I need to pass the content of `large_vector` with the `repeated field large_data_collection` of message `Large_Data`. I should `chunk` the size of the `large_vector` and populate the `repeated field large_data_collection` with that `chunk size`
- In server side all `chunk` will be concatenate by keeping the exact order of the `large_vector`. Some processing will be done on them (eg: `double the value of each index`). Then again whole data will be sent to the `client` as a `chunk stream`
- Would be great if the present `unary rpc` don't wait for the completion of the `bi-directional rpc`
Solution with example would be really helpful. Advance thanks. The [github repo is available here](
https://github.com/atifkarim/gRPC_CPP_CMake/tree/streaming_chunk).