Unsubscribe in pub/sub model

400 views
Skip to first unread message

mitsuo

unread,
Nov 22, 2021, 1:40:32 AM11/22/21
to Cap'n Proto
Hi,

I'm trying to implement pub/sub like communication model with the following scheme.
pubsub.capnp
interface EventPublisher{
    interface Subscriber {
        updateEvent @0 (event: Int32) -> ();
    }
    subscribe @0 (subscriber: Subscriber) -> (result: Int32);
    unsubscribe @1 (subscriber: Subscriber) -> (result: Int32);
}

I'm using kj::Vector<EventPublisher::Subscriber::Client> m_subscribers to store the current subscribers.
When I try to implement unsubscribe and remove the Subscriber from the Vector, I couldn't find good method to do that.
Could you give me some advice?

server implementation
class EventPublisherImpl final : public EventPublisher::Server {
 protected:
  ::kj::Promise<void> subscribe(SubscribeContext context) {
    cout << "subscribe request received" << endl;
    m_subscribers.add(context.getParams().getSubscriber());
    return kj::READY_NOW;
  }

  ::kj::Promise<void> unsubscribe(UnsubscribeContext context) {
    cout << "unsubscribe request received" << endl;
    auto unsub = context.getParams().getSubscriber();

    // I want to remove usub from subscribers like m_subscribers[unsub].erase();
    // But I couldn't find a method to compare
    // "EventPublisher::Subscriber::Client" such as == operator or public method
    // to distinguish the client.
    //
    // One solution is having an additional argument(id) for this purpose but
    // that requres additional management of ID.
    //  subscribe @0 (subscriber: Subscriber, id: Int32) -> (result: Int32);
    //  unsubscribe @1 (id: Int32) -> (result: Int32);

    // what I can do is erase everything but this is not my goal
    m_subscribers.clear();
    return kj::READY_NOW;
  }

 private:
  kj::Vector<EventPublisher::Subscriber::Client> m_subscribers;
};
Thank you,

Kenton Varda

unread,
Nov 22, 2021, 11:53:42 AM11/22/21
to mitsuo, Cap'n Proto
Hi Mitsuo,

I recommend designing the interface like this:

interface EventPublisher{
    interface Subscriber {
        updateEvent @0 (event: Int32) -> ();
    }
    interface Subscription {}
    subscribe @0 (subscriber: Subscriber) -> (result: Int32, subscription: Subscription);
    # To unsubscribe, drop the returned `subscription`.
}

Here, subscribe() returns a `subscription` object. This object has no methods. But, when the capability is dropped, then the destructor will run on the server side. Atn that point, you can remove the subscription.

A big advantage of this approach is that it handles connection failures gracefully. If the client randomly disconnects, the `subscription` capability is automatically dropped, thus unsubscribing the client. This way you don't end up stuck sending messages to a disconnected subscriber.

It also solves your problem because you can remember arbitrary metadata about the subscriber within the `Subscription` object, so you know what to remove when it's destroyed.

-Kenton

--
You received this message because you are subscribed to the Google Groups "Cap'n Proto" group.
To unsubscribe from this group and stop receiving emails from it, send an email to capnproto+...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/capnproto/26c4964f-21a2-4fe2-a410-7673786d40c9n%40googlegroups.com.

mitsuo

unread,
Nov 23, 2021, 9:40:55 PM11/23/21
to Cap'n Proto
Hi Kenton,

Thank you for your quick response! It's very helpful because the handler for the disconnection is another issue actually.

Jens Alfke

unread,
Dec 2, 2021, 3:22:44 PM12/2/21
to Cap'n Proto
I'm also implementing pub-sub, so I was glad to see this thread before I wasted too much time. I'm implementing this pattern, but having trouble on the client side.

In terms of the example interface, I've created my SubscriberImpl class, and written the code to send the "subscribe" message. Now I want to store the returned Subscription capability reference in my SubscriberImpl, so it can own it and drop it when it's done.

However, I can't figure out how to keep a reference to the SubscriberImpl, since I have to move it to the Request object (calling setSubscriber) and afterwards it's gone, so I can't call it again.

            auto rq = remotePublisher.subscribeRequest();
            auto impl = kj::heap<SubscriberImpl>();
            rq.setSubscriber(std::move(impl));
            auto promise = rq.send().then([&](auto response) {return response.getSubscription();});
            // somehow convey the promise to the SubscriberImpl...?

I'm sure this is just due to my incomplete understanding of how Promise/Client/Server objects work...
Thanks,

--Jens

Kenton Varda

unread,
Dec 3, 2021, 1:02:18 PM12/3/21
to Jens Alfke, Cap'n Proto
What I usually do is something like:

auto impl = kj::heap<SubscriberImpl>();
auto& ref = *impl;
Subscriber::Client client = kj::mv(impl);
rq.setSubscriber(client);

Now you can keep a copy of `client` locally, and as long as it still exists, then `ref` remains valid -- because `client` is itself a strong reference to the object.

Note this doesn't work if you need to get notification of when the `SubscriberImpl` is dropped from the remote side, since holding a strong ref locally prevents the destructor from being called. If this is an issue, then you need to do something different. What I usually do here is store a pointer somewhere, and then have the destructor null out that pointer. This effectively implements a weak reference.

-Kenton

Dane

unread,
Feb 8, 2022, 12:56:55 PM2/8/22
to Cap'n Proto
Hi All,

Implementing a similar thing, but with an asynchronous client in python. I can't for the life of me get the client object to become derefenced. 
The asyncio tx/rx seems to get stuck trying to read/write to the now-missing publisher.

I think the async loop owns a reference to the subscriber object (as well as the publisher), but I need to know when the publisher releases the reference in order to stop the async loop. 

Any ideas? 

Reply all
Reply to author
Forward
0 new messages