prefetch_count + pika official async consumer

185 views
Skip to first unread message

Nikos Skalis

unread,
Oct 12, 2015, 3:18:17 PM10/12/15
to rabbitmq-users
hi guys,

i would need your expertise and advice:

with the following two changes:

[1] setting prefect_count to 10
PREFETCH_COUNT = 10


   
def __init__(self, amqp_url):

...
def on_bindok(self, unused_frame):
        ip_spotlight_log
.info("%s > Queue bound", sys.argv[1])
       
self._channel.basic_qos(prefetch_count=self.PREFETCH_COUNT, prefetch_size=0)
       
self.start_consuming()

[2] exiting after the 1st call to basic_consume:
def on_message(self, unused_channel, basic_deliver, properties, body):
        ip_spotlight_log
.info("%s > Received message # %s from %s: %s", sys.argv[1], basic_deliver.delivery_tag, properties.app_id, body)
       
self.acknowledge_message(basic_deliver.delivery_tag)
        sys
.exit()

while the queue has 8000 messages, i would expect with 1 call to basic_consume to get 10 messages, but i am getting only 1.

2015-10-12 21:04:11.369 : INFO : SNIP : on_message : SNIP > Received message # 1 from SNIP: SNIP

2015-10-12 21:04:11.369 : INFO : SNIP : acknowledge_message : SNIP > Acknowledging message 1


could you please help me understand what i am doing wrong ?

Nikos

Alvaro Videla

unread,
Oct 12, 2015, 3:21:02 PM10/12/15
to rabbitm...@googlegroups.com
basic_consume will set up a consumer which is subscribed to a queue and in your particular case, will get at most 10 message each time.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Nikos Skalis

unread,
Oct 12, 2015, 3:22:13 PM10/12/15
to rabbitm...@googlegroups.com
thans Alvaro. but the question is why i am not getting 10 messages ? please see log output above.

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/NfmlMAO4it0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.

Tom Molesworth

unread,
Oct 12, 2015, 3:24:03 PM10/12/15
to rabbitm...@googlegroups.com
because you exit in the message handler? If you want to process 10 messages, you'll have to let that message handler get called 10 times. How do you expect the other messages to be processed if you've already exited after the first one?

If you only want to process 10 messages then exit without any more being delivered, then don't acknowledge anything until all 10 messages have been processed, and stop the consumer before the ACK.

Nikos Skalis

unread,
Oct 12, 2015, 3:31:50 PM10/12/15
to rabbitm...@googlegroups.com
thanks Tom. could you please advise on how-to get 10 messages all at once and handle them all-together ?
in order words, let's say get PREFETCH_COUNT number of messages (if available) with 1 call to basic_consume, and store PREFETCH_COUNT messages in a list ?

Nikos

Tom Molesworth

unread,
Oct 12, 2015, 3:41:40 PM10/12/15
to rabbitm...@googlegroups.com
That was the second paragraph of my message?

Accept messages without an ACK. When you have all 10, stop the consumer. Process the messages. Ack them with the multiple flag. Close channel. Disconnect. Exit. You'll need to handle timeouts and errors, of course, but if you do things in this order you can guarantee that you'll only get 10 messages.

Or just use basic.get.

Nikos Skalis

unread,
Oct 12, 2015, 3:51:09 PM10/12/15
to rabbitm...@googlegroups.com
thanks Tom. the sys.exit() is for debug purposes. could you please provide a code example only with the modification you mentioned ?
how should i check when i have 10 messages ? this should be regulated by prefetch_count correct ?
could you please provide an example as this is the 1st time am using rabbitmq ?

Nikos

Nikos Skalis

unread,
Oct 12, 2015, 4:12:31 PM10/12/15
to rabbitm...@googlegroups.com
in other words, i would like to process all messages (number of messages equal to PREFETCH_COUNT) at once, and not to process every message one-by-one ? how this can be achieved ?

Nikos

vitaly numenta

unread,
Oct 12, 2015, 4:51:39 PM10/12/15
to rabbitmq-users
The client library calls your on_message function every time a new message arrives. The client library doesn't aggregate messages before calling your on_message function. self._channel.basic_qos(...) simply tells RabbtMQ that it should suspend delivering messages to your consumer when the number of unacknowledged messages reaches PREFETCH_COUNT. It does NOT tell RabbitMQ to batch the messages.

vitaly numenta

unread,
Oct 12, 2015, 4:54:32 PM10/12/15
to rabbitmq-users
So, if you need to process the messages in batches, it's up to your code to aggregate the messages.

Tom Molesworth

unread,
Oct 12, 2015, 4:57:01 PM10/12/15
to rabbitm...@googlegroups.com
just as described, but don't stop the consumer or exit after processing a batch?

Nikos Skalis

unread,
Oct 13, 2015, 2:42:03 AM10/13/15
to rabbitm...@googlegroups.com
thank you guys. now it is clear to me.
may i ask if you can think of any better solution than: spawning a (multiprocessing) process that looks into the Queue (where the messages are aggregated) ?
Alternatively, I could use Redis, but that would be an overkill I think.

Nikos
Reply all
Reply to author
Forward
0 new messages