"regular" message queueing

242 views
Skip to first unread message

remco....@gmail.com

unread,
Feb 17, 2016, 5:48:47 PM2/17/16
to nameko-dev
Hi, 

I was searching for a way to use 'regular' queueing in nameko. 
It makes sense to have one service for sending mail. Using RPC from other services to this mailer service is possible but let's say i don't need the response, so why wait for an answer right? Here are a few options that came to mind: 

 a) call the rpc using async and 'forget' about the response

 b) use RPC to call a function in the mailsender that will drop an event to itself and return (nearly) instantly and let nameko handle the event to be picked up asynchronously - but it's quite an overload considering it to be a workaround for simple queueing. 

 c) use nameko.standalone.events.event_dispatcher and drop it right where i want the event to go, but then I lose all the beauty the providers/extensions and the mocks and testing those provide. 

What would be the preferred way to go about this from namekos philosophy? 

Tried something like but had no luck whatsoever.: 

    from nameko.messaging import Publisher, consume 
    from kombu import Exchange, Queue
    ew_exchange = Exchange('ew_messenger',)
    ew_sendmail_queue = Queue('mailqueue',exchange=ew_exchange,routing_key='sendmail')

    class someservice(object): 
        name='some'
        send_mail = Publisher(exchange=ew_exchange, queue=ew_sendmail_queue)
        def ... 
             self.send_mail(msg)


    class mailsender(object):
        name='mailsender'
        @event_handler('ew_messenger','mailqueue')
        def foo():
             # ... 
    


Clarity is greatly appreciated. 

Thanks, 
Remco Boerma

David Szotten

unread,
Feb 18, 2016, 4:27:10 AM2/18/16
to nameko-dev, remco....@gmail.com
Hi Remco,

Both a) and b) seem like reasonable options to me. 

"forgetting"  (or ignoring) the response (which may be empty if you're never going to use it) seems perfectly fine.

b) might let you add some more features if you want, e.g. the email service's rpc call could reply with some identifier and let clients later enquire about the result of the send status etc, whilst internally using some other mechanism to delay the sending. that could itself be an async rpc call, an event, or some different mechanism such as an external queue e.g. in a db. unless your email volumes are huge you may not need to worry about the overhead here

Best,
David

sophandmat...@gmail.com

unread,
Feb 18, 2016, 5:23:59 AM2/18/16
to nameko-dev, remco....@gmail.com
The Publisher and Consumer in nameko.messaging exist for this purpose. Events are a special case of "regular" messaging that uses a specific exchange, routing keys and queue names.

You are on the right track with the Publisher, but need to consume messages with the Consumer rather than an event_handler.

# messages.py

from kombu import Exchange, Queue
from nameko.messaging import Publisher, consume
from nameko.rpc import rpc


ew_exchange
= Exchange('ew_messenger',)

ew_sendmail_queue
= Queue('mailqueue', exchange=ew_exchange, routing_key='sendmail')


class SomeService(object):
    name
= 'some'

    send_mail
= Publisher(exchange=ew_exchange, queue=ew_sendmail_queue)

   
@rpc  # expose with rpc so we can test it easily
   
def send(self, msg):
       
self.send_mail(msg, routing_key=ew_sendmail_queue.routing_key)
       
return "sent!"


class MailSender(object):
    name
= 'mailsender'

   
@consume(ew_sendmail_queue)
   
def send_mail(self, msg):
        print("sending `{}`".format(msg))


Nameko Python 2.7.10 (default, Jul  6 2015, 15:19:48)
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] shell on darwin
Broker: amqp://guest:guest@localhost
>>> n.rpc.some.send("message!")
u
'sent!'


(tmp-2dfbeabdba274978)Matts-13-inch-Macbook-Pro:tmp mattbennett$ nameko run messages
starting services
: some, mailsender
Connected to amqp://guest:**@127.0.0.1:5672//
Connected to amqp://guest:**@127.0.0.1:5672//
sending
`message!`



For this particular use-case you can make it even simpler by removing the ew_exchange and using the default exchange instead -- then you'd publish with a routing_key equal to the destination queue name.

Matt.

remco....@gmail.com

unread,
Feb 18, 2016, 7:50:49 AM2/18/16
to nameko-dev, remco....@gmail.com
Thank you David and Matt for the clarity. 

@Matt, you mention:
For this particular use-case you can make it even simpler by removing the ew_exchange and using the default exchange instead -- then you'd publish with a routing_key equal to the destination queue name.

Which would be as simple as 
ew_exchange = Exchange()

or would it require more change to the code? 

Is this documented in the manual? If so I missed it about 40 times reading it again and again :)

With kind regards, 
Remco 


Matt Bennett

unread,
Feb 18, 2016, 11:26:32 AM2/18/16
to nameko-dev, remco....@gmail.com
You don't need to specify an exchange at all. All queues are explicitly bound to the default exchange, and you publish to them by using the queue name as the routing key.

from kombu import Queue

from nameko.messaging import Publisher, consume
from nameko.rpc import rpc


ew_sendmail_queue
= Queue('mailqueue')



class SomeService(object):
    name
= 'some'



    send_mail
= Publisher(queue=ew_sendmail_queue)



   
@rpc  # expose with rpc so we can test it easily
   
def send(self, msg):

       
self.send_mail(msg, routing_key="mailqueue")

       
return "sent!"




class MailSender(object):
    name
= 'mailsender'


   
@consume(ew_sendmail_queue)
   
def send_mail(self, msg):
       
print("sending `{}`".format(msg))

The Publisher and Consumer are not mentioned in the nameko docs I'm afraid. The default exchange behaviour is mentioned in the RabbitMQ tutorial docs (https://www.rabbitmq.com/getstarted.html), which are excellent.

remco....@gmail.com

unread,
Feb 18, 2016, 1:19:18 PM2/18/16
to nameko-dev, remco....@gmail.com
Thanks for the headsup. 

The RabbitMQ documentation is excellent. I remembered the default from there, but i wasn't sure if you would refer to that or another default.  

I do believe the Consumer and Publisher deserve an appropriate place in the docs. Even though it's "simple" it's definitely something worth mentioning to handle more of the common scenarios.  

The more I work with nameko the more I like it. Not having to handle the gory messaging details is a real pleasure. One thing that is bugging me a little though is the sending of datetime objects. I convert them to isoformat now. How do you go about this? 

The example is really great. So simple, so effective. Thanks a bunch! 
What happens if a different routing_key is used? What's the benefit of the freedom of not being forced to use the name which is already present from within send_mail? 

With kind regards, 
Remco 

Op donderdag 18 februari 2016 17:26:32 UTC+1 schreef Matt Bennett:

conor.s...@gmail.com

unread,
Feb 22, 2016, 1:45:55 PM2/22/16
to nameko-dev, remco....@gmail.com
Great stuff. This is exactly what I needed!

remco....@gmail.com

unread,
Mar 3, 2016, 5:05:39 PM3/3/16
to nameko-dev, remco....@gmail.com
How do you do this from a 'standalone' / foreign application? Any built-in support for that?


Op donderdag 18 februari 2016 17:26:32 UTC+1 schreef Matt Bennett:
You don't need to specify an exchange at all. All queues are explicitly bound to the default exchange, and you publish to them by using the queue name as the routing key.

Matt Bennett

unread,
Mar 6, 2016, 6:58:33 AM3/6/16
to nameko-dev, remco....@gmail.com
For "regular" AMQP messaging there's nothing nameko specific, so we didn't bother adding anything to the nameko.standalone package. You can use any AMQP library to publish or consume messages. Nameko uses kombu but there are plenty of others.

For an example using kombu, see the nameko.standalone.events module. A nameko event is nothing more than a normal AMQP message with a predefined exchange and routing key, so you won't have to modify it much.

remco....@gmail.com

unread,
Mar 10, 2016, 9:05:58 AM3/10/16
to nameko-dev, remco....@gmail.com
Thanks. This is what I had in mind, but just thought to ask if i hadn't missed anything. 

Op zondag 6 maart 2016 12:58:33 UTC+1 schreef Matt Bennett:
Reply all
Reply to author
Forward
0 new messages