basic_consume with callback function and passing arguments

4,403 views
Skip to first unread message

Shreyans Soni

unread,
Oct 14, 2015, 10:14:36 AM10/14/15
to Pika
Hi,

My python Code for basic_consume and callback method registration looks like below

#call back function to be called when we have any AMQP Message for any thread
def callback(ch, method, properties, body):
    print json.dumps(xmltodict.parse(body.replace(">\n",">")))


#Function to start listening queue for a Vblock specified by objVCE
def listenEvents(objVCE):
    credentials = pika.PlainCredentials(objVCE.strSTE,'')
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=objVCE.vBlock.getHost(),credentials=credentials, port=5672, ssl=False,virtual_host='/',socket_timeout=5000))
    channel = connection.channel()

    channel.queue_declare(queue='AMQPEvents', durable=True)
    channel.queue_bind(exchange='fmdefaultexchange',queue='AMQPEvents',routing_key='*.*')
    args = { "host":objVCE.vBlock.getHost() }

    # Registering call back function to be called when any AMQP message is received
    channel.basic_consume(callback,queue='AMQPEvents',no_ack=True, arguments=args)




My Question is,
I wanted to pass argument to callback function while registering callback function and that argument should be available in callback function whenever we received any events
I see that we have arguments parameter in basic_consume method, is that argument passed to call back function?

if so how do i receive it at callback function as definition for callback function will change?

any Examples?

vitaly numenta

unread,
Oct 15, 2015, 2:16:46 PM10/15/15
to Pika
The arguments parameter in basic_consume is sent to the broker. For reference, look up the description of basic.consume in http://www.rabbitmq.com/amqp-0-9-1-quickref.html.

A common way to do what you are asking is via the builtin functools.partial; see https://docs.python.org/2/library/functools.html.

Another way is to wrap your callback in the python lambda statement. e.g.,

def my_callback_with_extended_args(ch, method, properties, body, host):
    print json.dumps(xmltodict.parse(body.replace(">\n",">")))

channel.basic_consume(
  lambda ch, method, properties, body: my_callback_with_extended_args(ch, method, properties, body, host="objVCE.vBlock.getHost()"),
  queue='AMQPEvents',
  no_ack=True)



Shreyans Soni

unread,
Oct 16, 2015, 2:39:00 AM10/16/15
to Pika
Hey Thanks Vitaly it really helped 
Reply all
Reply to author
Forward
0 new messages