[python] New Component API and dynamic subscribe/unsubscribe register/unregister, how to ? (asyncio)

37 views
Skip to first unread message

Potens

unread,
Aug 20, 2018, 11:09:06 AM8/20/18
to Autobahn
Hi everyone,

I'm trying to use the new component API and I'm not able to find how to manage the subscribe/unsubscribe/register/unregister.

I started from the little Klein example in the doc (https://autobahn.readthedocs.io/en/latest/wamp/programming.html#patterns-for-more-complicated-applications) and came to this:

import asyncio
from autobahn.asyncio.component import Component, run
from autobahn import wamp

comp = Component(
    transports=[{
        "type": "rawsocket",
        "url": "/tmp/mysocket1",
        "endpoint": {
            "type": "unix",
            "path": "/tmp/mysocket1"
        }
    }],
    realm=u"realm1",
)

class MyClass:
    def __init__(self,  wamp_comp):
        self._wamp = wamp_comp
        self._session = None
        self._wamp.on("join", self._initialize)
        self._wamp.on("leave", self._uninitialize)

    async def _initialize(self, session: wamp.ISession, details):
        self._session = session
        print("Subscribed")
        await session.subscribe(handler=self.output, topic="com.test")

    def _uninitialize(self, session, reason):
        print(session, reason)
        print("Lost WAMP connection")
        self._session = None

    async def output(self, value):
        print("ok")
        print(value)
        await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
    print("session ready")
    count = 0
    while True:
        session.publish(topic="com.test", value="hello", count=count)
        count += 1
        print("hello")
        await asyncio.sleep(5)

if __name__ == "__main__":
    myobj = MyClass(comp)
    run([comp])

But, until now I've not been able to get something working when I do session.*.

Right now, the output is:
2018-08-20T16:52:13 connecting once using transport type "rawsocket" over endpoint "unix"
2018-08-20T16:52:13 ApplicationSession started.
session ready
hello
Subscribed
hello
hello
^C2018-08-20T16:52:25 Shutting down due to SIGINT

So the subscriber do not work.

The aim of this is to be able to use the subscribe, unsubscribe register unregister without the decorators because the topics and so on will be defined at run time (this example is utterly simplified to stress my problem)(and also without the old ApplicationSession)

If someone has a little clue for me, it would be great.

Nicolas

Adrien Emery

unread,
Aug 23, 2018, 12:41:05 AM8/23/18
to Autobahn
Your only running one component - you need a second component to subscribe to the topic being published by the current component you have.
Just tweak your the run function to run two components so they can talk balk and forth.

if __name__ == "__main__":
    myobj = MyClass(comp)
    run([comp, comp])

Potens

unread,
Aug 23, 2018, 4:39:08 AM8/23/18
to Autobahn
Hi,

Thanks a lot for your answer, it helped me to find my mistake. Trying your advice resulted having two sessions, subscriptions and publish, so everything twice (but the first publish, only once), trying to understand further what was going on (trying to understand why I would have to have two components), I understood I had forgotten to use options=wamp.PublishOptions(exclude_me=False) in the publish... :(
Now it's working, the only thing is I loose the first publish since (I guess) the decorator comp.join is done before the _initialize.

So, to remember, read and re-read the doc (autobahn and crossbar).

Thanks a lot Adrien, your answer made me ask myself the good questions to understand my mistake.

For the records, here is my final script:

import asyncio

from autobahn import wamp

from autobahn.asyncio.component import Component, run

comp = Component(
    transports=[{
        "type": "rawsocket",
        "url": "/tmp/mysocket1",
        "endpoint": {
            "type": "unix",
            "path": "/tmp/mysocket1"
        }
    }],
    realm=u"realm1",
)


class MyClass:
    def __init__(self, wamp_comp):
        self._wamp = wamp_comp
        self._session = None
        self._wamp.on("join", self._initialize)
        self._wamp.on("leave", self._uninitialize)

    def _initialize(self, session: wamp.ISession, details):
        self._session = session
        print("Subscribed")
        print("session")
        print(session)

        session.subscribe(handler=self.output, topic="com.test")

    def _uninitialize(self, session, reason):
        print(session, reason)
        print("Lost WAMP connection")
        self._session = None

    async def output(self, value, *args, **kwargs):
        print("output:")
        print("\tvalue :", value)
        print("\targs :", args)
        print("\tkwargs :", kwargs)

        await asyncio.sleep(0)


@comp.on_join
async def joined(session, details):
    print("session ready")
    count = 0
    while True:
        session.publish(
            topic="com.test",
            value="hello",
            count=count,
            options=wamp.PublishOptions(exclude_me=False))
        count += 1
        print("publishing hello ", count)

        await asyncio.sleep(5)


if __name__ == "__main__":
    myobj = MyClass(comp)
    run([comp])

And the output:

2018-08-23T10:30:50 connecting once using transport type "rawsocket" over endpoint "unix"
2018-08-23T10:30:50 ApplicationSession started.
Subscribed
session
<autobahn.asyncio.wamp.Session object at 0x7f9c1bbb3f60>
session ready
publishing hello  1
output:
        value : hello
        args : ()
        kwargs : {'count': 0}
publishing hello  2
output:
        value : hello
        args : ()
        kwargs : {'count': 1}
publishing hello  3
output:
        value : hello
        args : ()
        kwargs : {'count': 2}
^C2018-08-23T10:31:00 Shutting down due to SIGINT


Adrien Emery

unread,
Aug 23, 2018, 12:46:26 PM8/23/18
to Autobahn
Great! And I didn't know you could publish messages and listen from the same component using: 

options=wamp.PublishOptions(exclude_me=False)

Cheers!
Reply all
Reply to author
Forward
0 new messages