class Service-A:
name = "Service-A"
CONFIG = {'AMQP_URI' : "amqp://......"}
def __init__(self):
self.__NATSserversList = ["nats://localhost:4222"]
self.__natsClient = NATS()
self.__subjects = ["k8s.events", "aws.events", "k8s.alarms", "aws.alarms"]
self.__loop = asyncio.new_event_loop()
self.__dispatch = EventDispatcher()
print("Created a Service-A")
@rpc
def Service-A_start(self):
self.__loop.run_until_complete(self.__natsReaderLoop(self.__loop))
try:
print("Starting event Loop")
self.__loop.run_forever()
finally:
print("Closing event Loop")
self.__loop.close()
async def __readEventMessage(self, msg):
try:
self.__dispatch("Event_AWSEvent", payload=msg)
except Exception as e:
print("Exception in processing message on Topic")
def __natsReaderLoop(self, loop):
try:
yield from self.__natsClient.connect(servers=self.__NATSserversList,
io_loop=self.__loop)
yield from self.__natsClient.subscribe(self.__subjects,
"nats-subscriber",
self.__readEventMessage)
except Exception as e:
print(e)
from nameko.containers import ServiceContainer
container = ServiceContainer(Service-A, config=Service-A.CONFIG)
service_extensions = list(container.extensions)
container.start()
with ClusterRpcProxy(Service-A.CONFIG) as rpc:
rpc.Service-A.Service-A_start()