nameko stuck in a self.__dispatch() call for a service sending an event to another service

27 views
Skip to first unread message

nsam...@gmail.com

unread,
Aug 1, 2018, 7:35:42 AM8/1/18
to nameko-dev
I am running Nameko 2.9.0 with python 3.5.

I have a piece of code where service-A needs to dispatch an event to service-B.
When I run Service-A to read a NATS queue and pass an API to another service (Service-B), the Nameko service for Service-A gets stuck in the EventDispatch for Service-B.

Any clues ?


class Service-A:
    name = "Service-A"
    CONFIG = {'AMQP_URI' : "amqp://......"}


    def __init__(self):
        self.__NATSserversList = ["nats://localhost:4222"]
        self.__natsClient = NATS()
        self.__subjects = ["k8s.events", "aws.events", "k8s.alarms", "aws.alarms"]
        self.__loop = asyncio.new_event_loop()
        self.__dispatch = EventDispatcher()
        print("Created a Service-A")

    @rpc
    def Service-A_start(self):
        self.__loop.run_until_complete(self.__natsReaderLoop(self.__loop))
        try:
            print("Starting event Loop")
            self.__loop.run_forever()
        finally:
            print("Closing event Loop")
            self.__loop.close()


    async def __readEventMessage(self, msg):
        try:
               self.__dispatch("Event_AWSEvent", payload=msg)
        except Exception as e:
            print("Exception in processing message on Topic")

    def __natsReaderLoop(self, loop):
        try:
            yield from self.__natsClient.connect(servers=self.__NATSserversList,
                                                 io_loop=self.__loop)
            yield from self.__natsClient.subscribe(self.__subjects,
                                                       "nats-subscriber",
                                                       self.__readEventMessage)

        except Exception as e:
            print(e)

from nameko.containers import ServiceContainer
container = ServiceContainer(Service-A, config=Service-A.CONFIG)
service_extensions = list(container.extensions)
container.start()
with ClusterRpcProxy(Service-A.CONFIG) as rpc:
    rpc.Service-A.Service-A_start()



nsam...@gmail.com

unread,
Aug 1, 2018, 10:07:03 AM8/1/18
to nameko-dev
Hi ..

I just tried changing the dispatcher 'dispatch' from a instance variable to a class variable and it seemed to work.
Whats the logic ? [ I am a newbie to Nameko ]

 .. Do all instances of the Service share a Dispatcher ?
Reply all
Reply to author
Forward
0 new messages