// Inner class StreamAssetNodes
class StreamAssetNodes : public RequestBase {
public:
StreamAssetNodes( AsyncAssetStreamerManager& owner ) : RequestBase( owner ), ownerClass( owner ) {
owner_.grpc().service_.RequestStreamAssetNodes(
&context_, &stream_, cq(), cq(), in_handle_.tag( Handle::Operation::CONNECT, [this, &owner]( bool ok, Handle::Operation /* op */ ) {
LOG_DEBUG << "\n" + me( *this ) << "\n\n*****************************************************************\n"
<< "- Processing a new connect from " << context_.peer()
<< "\n\n*****************************************************************\n"
<< endl;
cout << "\n" + me( *this ) << "\n*****************************************************************\n"
<< "- Processing a new connect from " << context_.peer() << "\n*****************************************************************\n"
<< endl;
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The CONNECT-operation failed." << endl;
cout << "The CONNECT-operation failed." << endl;
return;
}
// Creates a new instance so the service can handle requests from a new client
owner_.createNew<StreamAssetNodes>( owner );
// Reads request's parameters
readNodeIds();
} ) );
}
private:
// Objects and variables
AsyncAssetStreamerManager& ownerClass;
::Illuscio::AssetNodeIds request_;
::Illuscio::AssetNodeComponent reply_;
::grpc::ServerContext context_;
::grpc::ServerAsyncReaderWriter<decltype( reply_ ), decltype( request_ )> stream_ { &context_ };
vector<string> nodeids_vector;
// Contains mapping for all the nodes of a set of assets
json assetsNodeMapping;
// Contains mapping for all the nodes of a particular asset
json assetNodeMapping;
ifstream nodeFile;
// Handle for messages coming in
Handle in_handle_ { *this };
// Handle for messages going out
Handle out_handle_ { *this };
int fileNumber = 0;
const int chunk_size = 16 * 1024;
char buffer[16 * 1024];
// Methods
void readNodeIds() {
// Reads RPC request parameters
stream_.Read( &request_, in_handle_.tag( Handle::Operation::READ, [this]( bool ok, Handle::Operation op ) {
if ( !ok ) [[unlikely]] { return; }
// Assigns the request to the nodeids vector
nodeids_vector.assign( request_.nodeids().begin(), request_.nodeids().end() );
request_.clear_nodeids();
if ( !nodeids_vector.empty() ) {
ownerClass.assetNodeMapping = ownerClass.assetsNodeMapping[request_.uuid()];
if ( ownerClass.assetNodeMapping.empty() ) {
stream_.Finish( grpc::Status( grpc::StatusCode::NOT_FOUND, "Asset's UUID not found in server..." ),
in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed." << endl;
cout << "The FINISH request-operation failed." << endl;
}
LOG_DEBUG << "Asset's UUID not found in server: " << request_.uuid() << endl;
cout << "Asset's UUID not found in server: " << request_.uuid() << endl;
} ) );
return;
}
writeNodeFile( nodeids_vector.front() );
} else {
stream_.Finish( grpc::Status( grpc::StatusCode::DATA_LOSS, "Asset' node ids empty. Without node ids node streaming can't start..." ),
in_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed.";
cout << "The FINISH request-operation failed.";
}
LOG_DEBUG << "Asset' node ids coming empty on the request. Without node ids node streaming can't start..." << endl;
cout << "Asset' node ids coming empty on the request. Without node ids node streaming can't start..." << endl;
} ) );
}
} ) );
}
void writeNodeFile( const string& nodeId ) {
// Opens the file which contains the requested node
nodeFile.open( string( ownerClass.assetNodeMapping[nodeId] ), ios::binary );
if ( !nodeFile.is_open() ) {
LOG_DEBUG << "Asset's node file open operation failed for node:" << nodeId << endl;
cout << "Asset's node file open operation failed for node:" << nodeId << endl;
}
splitFileAndWriteChunks();
}
void splitFileAndWriteChunks() {
setReplyWithBuffer();
stream_.Write( reply_, out_handle_.tag( Handle::Operation::WRITE, [this]( bool ok, Handle::Operation op ) {
if ( !nodeFile.eof() ) {
splitFileAndWriteChunks();
} else if ( !nodeids_vector.empty() ) {
nodeFile.close();
nodeids_vector.erase( nodeids_vector.begin() );
if ( !nodeids_vector.empty() ) {
writeNodeFile( nodeids_vector.front() );
} else {
finishIfDone();
}
}
} ) );
}
void setReplyWithBuffer() {
// Fills read buffer
nodeFile.read( buffer, chunk_size );
// Prepare reply and start writing
reply_.Clear();
reply_.set_chunk_data( buffer, static_cast<int>( nodeFile.gcount() ) );
}
// We wait until all incoming messages are received and all outgoing messages are sent
// before we send the finish message.
void finishIfDone() {
stream_.Finish( grpc::Status::OK, out_handle_.tag( Handle::Operation::FINISH, [this]( bool ok, Handle::Operation /* op */ ) {
if ( !ok ) [[unlikely]] {
LOG_DEBUG << "The FINISH request-operation failed." << endl;
cout << "The FINISH request-operation failed." << endl;
}
} ) );
}
};