multiple async clients for route_guide_callback_server example

73 views
Skip to first unread message

Dmitry Gorelov

unread,
Mar 21, 2023, 6:12:55 PMMar 21
to grpc.io
Hi All,

please help to modify this peace of server code for bidirectional stream in order to make it work correclty with multiple clients at one time. Currently it crashes with segmentation fault in the proto_utils.h.

class RouteGuideImpl final : public RouteGuide::CallbackService {
 public:
  explicit RouteGuideImpl(const std::string& db) {
    routeguide::ParseDb(db, &feature_list_);
  }  

  grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
      CallbackServerContext* context) override {
    class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
     public:
      Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
          : mu_(mu), received_notes_(received_notes) {
        StartRead(&note_);
      }

      void OnDone() override { delete this; }
      void OnReadDone(bool ok) override {
        if (ok) {
          // Unlike the other example in this directory that's not using
          // the reactor pattern, we can't grab a local lock to secure the
          // access to the notes vector, because the reactor will most likely
          // make us jump threads, so we'll have to use a different locking
          // strategy. We'll grab the lock locally to build a copy of the
          // list of nodes we're going to send, then we'll grab the lock
          // again to append the received note to the existing vector.
          mu_->Lock();
          std::copy_if(received_notes_->begin(), received_notes_->end(),
                       std::back_inserter(to_send_notes_),
                       [this](const RouteNote& note) {
                         return note.location().latitude() ==
                                    note_.location().latitude() &&
                                note.location().longitude() ==
                                    note_.location().longitude();
                       });
          mu_->Unlock();
          notes_iterator_ = to_send_notes_.begin();
          NextWrite();
        } else {
          std::cout << "some client finished" << std::endl;
          Finish(Status::OK);
        }
      }
      void OnWriteDone(bool /*ok*/) override { NextWrite(); }

     private:
      void NextWrite() {
        if (notes_iterator_ != to_send_notes_.end()) {
          StartWrite(&*notes_iterator_);
          notes_iterator_++;
        } else {
          mu_->Lock();
          received_notes_->push_back(note_);
          mu_->Unlock();
          StartRead(&note_);
        }
      }
      RouteNote note_;
      absl::Mutex* mu_;
      std::vector<RouteNote>* received_notes_;
      std::vector<RouteNote> to_send_notes_;
      std::vector<RouteNote>::iterator notes_iterator_;
    };
    return new Chatter(&mu_, &received_notes_);
  }

 private:
  std::vector<Feature> feature_list_;
  absl::Mutex mu_;
  std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
};
Message has been deleted

Dmitry Gorelov

unread,
Mar 21, 2023, 6:42:07 PMMar 21
to grpc.io
Oh man, it is not working even with one client! same problem in proto_utils.h


среда, 22 марта 2023 г. в 01:12:55 UTC+3, Dmitry Gorelov:

Dmitry Gorelov

unread,
Mar 21, 2023, 6:58:41 PMMar 21
to grpc.io
//this is server code of bidirectional grpc
/*
 *
 * Copyright 2021 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <algorithm>
#include <chrono>
#include <cmath>
#include <iostream>
#include <memory>
#include <string>
#include <thread>

#include "helper.h"

#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#ifdef BAZEL_BUILD
#include "examples/protos/route_guide.grpc.pb.h"
#else
#include "route_guide.grpc.pb.h"
#endif

using grpc::CallbackServerContext;
using grpc::Server;
using grpc::ServerBuilder;
using grpc::Status;
using routeguide::Feature;
using routeguide::Point;
using routeguide::Rectangle;
using routeguide::RouteGuide;
using routeguide::RouteNote;
using routeguide::RouteSummary;
using std::chrono::system_clock;
void RunServer(const std::string& db_path) {
  std::string server_address("0.0.0.0:50051");
  RouteGuideImpl service(db_path);

  ServerBuilder builder;
  builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
  builder.RegisterService(&service);
  std::unique_ptr<Server> server(builder.BuildAndStart());
  std::cout << "Server listening on " << server_address << std::endl;
  server->Wait();
}

int main(int argc, char** argv) {
  // Expect only arg: --db_path=path/to/route_guide_db.json.
  std::string db = routeguide::GetDbFileContent(argc, argv);
  RunServer(db);

  return 0;
}

среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov:

Dmitry Gorelov

unread,
Mar 21, 2023, 6:59:12 PMMar 21
to grpc.io
//and this is client code

/*
 *
 * Copyright 2021 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <random>

#include <string>
#include <thread>

#include "helper.h"

#include <grpc/grpc.h>
#include <grpcpp/alarm.h>
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
#include <grpcpp/security/credentials.h>

#ifdef BAZEL_BUILD
#include "examples/protos/route_guide.grpc.pb.h"
#else
#include "route_guide.grpc.pb.h"
#endif

using grpc::Channel;
using grpc::ClientContext;

using grpc::Status;
using routeguide::Feature;
using routeguide::Point;
using routeguide::Rectangle;
using routeguide::RouteGuide;
using routeguide::RouteNote;
using routeguide::RouteSummary;

Point MakePoint(long latitude, long longitude) {
  Point p;
  p.set_latitude(latitude);
  p.set_longitude(longitude);
  return p;
}

Feature MakeFeature(const std::string& name, long latitude, long longitude) {
  Feature f;
  f.set_name(name);
  f.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
  return f;
}

RouteNote MakeRouteNote(const std::string& message, long latitude,
                        long longitude) {
  RouteNote n;
  n.set_message(message);
  n.mutable_location()->CopyFrom(MakePoint(latitude, longitude));
  return n;
}

class RouteGuideClient {
 public:
  RouteGuideClient(std::shared_ptr<Channel> channel, const std::string& db)
      : stub_(RouteGuide::NewStub(channel)) {
    routeguide::ParseDb(db, &feature_list_);
  }

  void RouteChat() {
    class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> {
     public:
      explicit Chatter(RouteGuide::Stub* stub)
          : notes_{MakeRouteNote("First message", 0, 0),
                   MakeRouteNote("Second message", 0, 1),
                   MakeRouteNote("Third message", 1, 0),
                   MakeRouteNote("Fourth message", 0, 0)},
            notes_iterator_(notes_.begin()) {
        stub->async()->RouteChat(&context_, this);
        NextWrite();
        StartRead(&server_note_);
        StartCall();

      }
      void OnWriteDone(bool /*ok*/) override { NextWrite(); }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Got message " << server_note_.message() << " at "
                    << server_note_.location().latitude() << ", "
                    << server_note_.location().longitude() << std::endl;
          StartRead(&server_note_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (notes_iterator_ != notes_.end()) {
          const auto& note = *notes_iterator_;
          std::cout << "Sending message " << note.message() << " at "
                    << note.location().latitude() << ", "
                    << note.location().longitude() << std::endl;
          StartWrite(&note);
          notes_iterator_++;
        } else {
          StartWritesDone();
        }
      }
      ClientContext context_;
      const std::vector<RouteNote> notes_;
      std::vector<RouteNote>::const_iterator notes_iterator_;
      RouteNote server_note_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

    Chatter chatter(stub_.get());
    Status status = chatter.Await();
    if (!status.ok()) {
      std::cout << "RouteChat rpc failed." << std::endl;
    }
  }

 private:  

  const float kCoordFactor_ = 10000000.0;
  std::unique_ptr<RouteGuide::Stub> stub_;
  std::vector<Feature> feature_list_;

};

int main(int argc, char** argv) {
  // Expect only arg: --db_path=path/to/route_guide_db.json.
  std::string db = routeguide::GetDbFileContent(argc, argv);
  RouteGuideClient guide(
      grpc::CreateChannel("localhost:50051",
                          grpc::InsecureChannelCredentials()),
      db);

  std::cout << "-------------- RouteChat --------------" << std::endl;
  guide.RouteChat();

  return 0;
}

среда, 22 марта 2023 г. в 01:42:07 UTC+3, Dmitry Gorelov:
Oh man, it is not working even with one client! same problem in proto_utils.h

Dmitry Gorelov

unread,
Mar 21, 2023, 7:00:25 PMMar 21
to grpc.io
Both of them , the client and server, are  from route_guide example. I left only bidirectional part.
after some attemts to run client, either the client or the server crash.

Please help to fix them!


среда, 22 марта 2023 г. в 01:59:12 UTC+3, Dmitry Gorelov:

Dmitry Gorelov

unread,
Mar 21, 2023, 7:15:55 PMMar 21
to grpc.io
2023-03-22 02_14_54-Window.png

среда, 22 марта 2023 г. в 02:00:25 UTC+3, Dmitry Gorelov:
Message has been deleted

Dmitry Gorelov

unread,
Mar 22, 2023, 5:52:22 AMMar 22
to grpc.io
Please check the following code, it fixes the crash of the route_guide_callback_server!

         #include <algorithm>
         #include <chrono>
         #include <cmath>
         #include <iostream>
         #include <memory>

         #include <string>
         #include <thread>
         
         #include "helper.h"
         
         #include <grpc/grpc.h>
         #include <grpcpp/security/server_credentials.h>
         #include <grpcpp/server.h>
         #include <grpcpp/server_builder.h>
         #include <grpcpp/server_context.h>
         #ifdef BAZEL_BUILD
         #include "examples/protos/route_guide.grpc.pb.h"
         #else
         #include "route_guide.grpc.pb.h"
         #endif
         
         using grpc::CallbackServerContext;
         using grpc::Server;
         using grpc::ServerBuilder;
         using grpc::Status;
         using routeguide::Feature;
         using routeguide::Point;
         using routeguide::Rectangle;
         using routeguide::RouteGuide;
         using routeguide::RouteNote;
         using routeguide::RouteSummary;
         using std::chrono::system_clock;
                   notes_iterator_ = to_send_notes_.begin();
                   mu_->Unlock();
                   NextWrite();
                 } else {
                   //std::cout << "some client finished" << std::endl;
                   Finish(Status::OK);
                 }
               }
               void OnWriteDone(bool ok) override
               {
                 if (ok)
                 {
                   NextWrite();
                 }
                 else
                 {
                   std::cout << "some client finished write" << std::endl;
                   Finish(Status::OK);
                 }
               }
         
              private:
               void NextWrite()
               {
                 mu_->Lock();

                 if (notes_iterator_ != to_send_notes_.end()) {
                   StartWrite(&*notes_iterator_);
                   notes_iterator_++;
                 } else {          
                   received_notes_->push_back(note_);          
                   StartRead(&note_);
                 }
                 mu_->Unlock();

               }
               RouteNote note_;
               absl::Mutex* mu_;
               std::vector<RouteNote>* received_notes_;
               std::vector<RouteNote> to_send_notes_;
               std::vector<RouteNote>::iterator notes_iterator_;
             };
             return new Chatter(&mu_, &received_notes_);
           }
         
          private:
           std::vector<Feature> feature_list_;
           absl::Mutex mu_;
           std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
         };
         
         void RunServer(const std::string& db_path) {
           std::string server_address("0.0.0.0:50051");
           RouteGuideImpl service(db_path);
         
           ServerBuilder builder;
           builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
           builder.RegisterService(&service);
           std::unique_ptr<Server> server(builder.BuildAndStart());
           std::cout << "Server listening on " << server_address << std::endl;
           server->Wait();
         }
         
         int main(int argc, char** argv) {
           // Expect only arg: --db_path=path/to/route_guide_db.json.
           std::string db = routeguide::GetDbFileContent(argc, argv);
           RunServer(db);
         
           return 0;
         }

среда, 22 марта 2023 г. в 02:15:55 UTC+3, Dmitry Gorelov:

yas...@google.com

unread,
Mar 29, 2023, 1:59:50 PMMar 29
to grpc.io
Could I ask you to create a tracking issue for this on github please? Also if you've got a working solution, contributions to source are welcome :)

karthik karra

unread,
May 3, 2023, 11:58:11 AMMay 3
to grpc.io
anyone was able to get the multiple async client example working. 
in my example, server is able to receive only single message from each client (i was trying with 2 clients) and both the clients are continuously sending the messages but server is not receiving any thing.

Please correct me if wrong but my understanding is this - while using callback api, irrespective of number of clients, there will be only one server registered and the grpc will take care to instantiate as many server instances as needed to interact with clients connected.

karthik karra

unread,
May 5, 2023, 1:23:10 PMMay 5
to grpc.io
hi all, below is code which i am trying to make it work. after untar copy the files under route_guide folder of examples/cpp in grpc repo.

it has cmake file and it will generate client and server binaries
To run
1st terimal - ./server
2nd terminal - ./client <client_id> <num of streams> eg: ./client 1 1 or ./client 1 2

with one client and 1 stream it works good but with one client 2 streams or 2 clients 1 stream each it sends and receives one message thats it. nothing happens after that.

please provide some pointers on this.

the reason for structuring the code this way is my main project also has similar design and apologies for throwing bunch of files.

thanks for any help
karthik

karthik karra

unread,
May 5, 2023, 1:25:42 PMMay 5
to grpc.io
sorry one correction is, when i mean nothing happens it means client keeps on sending messages but server doesn't receive it and as a result there will be no reply to client
Reply all
Reply to author
Forward
0 new messages