Pub/Sub server data push not received

108 views
Skip to first unread message

Gyorgy Miru

unread,
May 6, 2022, 7:22:50 AM5/6/22
to Cap'n Proto
Hi all,

I am trying to implement a simple architecture where various backend servers gather data (on Android devices) and push them to the subscribers. In the backend the data gatherer runs is a separate thread from the server. My current issue is when messages are sent to the subscriber, they are not received on the client side. I am not sure if I need to wait on the promise that is returned by the send() call in order to actually place the message on the wire or maybe the problem is completely unrelated.

First, I've tried using executor->executeSync(), from the gatherer thread, to the send the messages, but then I can't wait on the promise because the code runs in an event callback.
I've also tried to start a kj event loop in the gatherer thread and publish the messages from there, but then when  I try to wait I get the following exception " Nothing to wait for; this thread would hang forever."
Please find the relevant code samples below:

test.capnp

@0xf84f5237f35aec13;

interface Proxy {
   getBackend @0 () -> (backend: Backend);
}

interface Backend {
   consume @0 (msg :Message);
   subscribe @1 (receiver :Receiver);
}

interface Receiver {
   receive @0 (msg: Message);
}

struct Message {
   body @0 :Data;
}

backend.cpp

class BackendImpl final: public Backend::Server {
    private:
        bool first_received;
        bool receiver_ready;
        unsigned recv_count;
        unsigned count;
        unsigned size;
        chrono::time_point<chrono::high_resolution_clock> begin;
        Receiver::Client receiver;
   
    public:
        BackendImpl(unsigned cnt): first_received(false), receiver_ready(false), count(cnt), receiver(nullptr) {};
        kj::Promise<void> consume(ConsumeContext context) override {
            //... Benchmark tests are omitted
            return kj::READY_NOW;
        }

        kj::Promise<void> subscribe(SubscribeContext context) override {
            this->receiver = context.getParams().getReceiver();
            cout << "Receiver received" << endl;
            this->receiver_ready = true;
            return kj::READY_NOW;
        }

        //bool send(int cnt, int size, kj::WaitScope& ws) { // I have tried passing the wait scope
        bool send(int cnt, int size) {
            if (!this->receiver_ready) {
                return false;
            }
            this->receiver_ready = false;
            cout << "Sending data" << endl;
            unsigned char* msg = new unsigned char[size];
            memset(msg, 'a', size);


            for (int i = 0; i < this->count; i++) {
               
                auto request = this->receiver.receiveRequest();
                request.getMsg().setBody(capnp::Data::Reader(msg, size));
                auto promise = request.send();
                //auto promise = request.send().wait(ws);

                auto res = promise.then([](capnp::Response<Receiver::ReceiveResults>&& result){
                    cout << result.totalSize().wordCount << endl << flush; // This never gets called
                    return result.totalSize().wordCount;
                    });

            }
            delete[] msg;
            cout << "Sending done" << endl << flush;
            return true;
        }
};

BackendImpl* backend;
void data_pusher(const kj::Executor* executor, int size, int cnt) {

    //kj::EventLoop loop;
    //loop.run();
    //kj::WaitScope waitScope(loop);
    while (executor->isLive()) {
        this_thread::sleep_for(1s);

        //backend->send(cnt, size, waitScope);

        executor->executeSync( [backend=backend, cnt=cnt, size=size] {
                backend->send(cnt, size);
            });
    }    
}

int main(int argc, const char* argv[]) {

  auto be = kj::heap<BackendImpl>(10000);
  backend = be.get(); // There must be a sexier way to do this
 
  // Set up a server.
  capnp::EzRpcServer server(kj::mv(be), "127.0.0.1:11223");

  waitScope = &server.getWaitScope();

  const kj::Executor& executor = kj::getCurrentThreadExecutor();
  std::thread thr(data_pusher, &executor, 4096, 10000);
  std::cout << "Backend Ready" << std::endl;

  // Run forever, accepting connections and handling requests.
  kj::NEVER_DONE.wait(*waitScope);
  thr.join();
}

client.py

capnp.remove_import_hook()
a = capnp.load('./test.capnp')

class ReceiverImpl(a.Receiver.Server):

    def __init__(self, count):
        self.count = count
        self.has_received = False
        self.recv_cnt = 0

    def receive(self, msg, _context, **kwargs):
        print("We get called, yay!")
        #... benchmarking code omitted

def direct_subscribe():

    client = capnp.TwoPartyClient('127.0.0.1:11223')
    backend = client.bootstrap().cast_as(a.Backend)
    #backend = proxy.getBackend().backend

    receiver = ReceiverImpl(10000)

    backend.subscribe(receiver).wait()

    while receiver.recv_cnt < 10000:
        time.sleep(1)

direct_subscribe()

When I execute this code, without any wait() calls in the publisher, the sending immediately completes async, but the receive() function of the receiver never gets called.

Thank you for your time and assistance.
-Gym

Leon Wessels

unread,
May 10, 2022, 3:00:29 AM5/10/22
to Cap'n Proto
In your send function you should
request.send().detach([&](kj::Exception&& error) { /*handle returned error*/; });
If you don't the promise is destructed and the call gets cancelled.

Gyorgy Miru

unread,
May 10, 2022, 7:16:54 AM5/10/22
to Cap'n Proto
Thank you for your response. It helped me find the culprit of the problem.
The main reason the 'receive' callback was not executed on the client side was the sleep loop that starves the event loop.
Control needs to be passed explicitly to the event loop in the python client by calling 'capnp.poll_once()'.
The following modification takes care of that:

    client = capnp.TwoPartyClient('127.0.0.1:11223')
    backend = client.bootstrap().cast_as(a.Backend)
    #backend = proxy.getBackend().backend

    receiver = ReceiverImpl(1000)

    backend.subscribe(receiver).wait()
    print("Subscribed to backend")
    while receiver.recv_cnt < 1000:
        #time.sleep(1)
        time.sleep(0.001)
        capnp.poll_once()

The promise also needs to be detached on server side, as you suggested, otherwise it wouldn't get resolved.
Reply all
Reply to author
Forward
0 new messages