Disabling consumer preftech does not work

276 views
Skip to first unread message

Dao-hui Chen

unread,
Mar 2, 2015, 10:26:37 PM3/2/15
to rabbitm...@googlegroups.com
In my user case, I have to keep consumer prefetch disabled to keep message in ready state, not in unack state (so that messages will expire).

I use rabbitmq-c to connect to a 3.4.3 cluster, set consumer prefetch count to 0 using the following code:

...connecting to queue...
...declare temp queue...

amqp_basic_qos(trans->conn, 1, 0, 1, 0);

...begin to consume from queue...

but when I have slow workers, messages still sent down to these workers and in unack state.

rabbitmqctl tells:

aaroot@aamqtest1:~$ sudo rabbitmqctl list_channels name prefetch_count messages_unacknowledged|grep 226
1.10.29.226:33061 -> 1.16.51.3:5672 (1)   0       0
1.10.29.226:36153 -> 1.16.51.3:5672 (1)   1       37
1.10.29.226:36154 -> 1.16.51.3:5672 (1)   1       41
1.10.29.226:36155 -> 1.16.51.3:5672 (1)   1       43
1.10.29.226:36156 -> 1.16.51.3:5672 (1)   1       42
1.10.29.226:36157 -> 1.16.51.3:5672 (1)   1       8
1.10.29.226:36158 -> 1.16.51.3:5672 (1)   1       29
1.10.29.226:36159 -> 1.16.51.3:5672 (1)   1       6
1.10.29.226:36160 -> 1.16.51.3:5672 (1)   1       112
1.10.29.226:36161 -> 1.16.51.3:5672 (1)   1       103
1.10.29.226:36162 -> 1.16.51.3:5672 (1)   1       6
1.10.29.226:36163 -> 1.16.51.3:5672 (1)   1       50

Is there any problems with my setting?

Alvaro Videla

unread,
Mar 3, 2015, 3:08:09 AM3/3/15
to Dao-hui Chen, rabbitm...@googlegroups.com
Hi,

A prefetch count of 0 means "unlimited prefetch", the minimum you could use would be 1.

Is there a reason why you don't basic.cancel the consumers, and then send a basic.consume when your consumers are ready to receive messages?

Regards,

Alvaro


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dao-hui Chen

unread,
Mar 3, 2015, 3:47:50 AM3/3/15
to rabbitm...@googlegroups.com
Hi,

The arguments for amqp_basic_qos ( in rabbitmq-c ) is (connection, channel, prefetch_size, prefetch_count, global),

thus my prefetch count is set to 1, which can be confirmed from rabbitmqctl output.

I tried sending basic_cancel in worker but the message still prefetched.

Simon MacMullen

unread,
Mar 3, 2015, 8:12:15 AM3/3/15
to Dao-hui Chen, rabbitm...@googlegroups.com
What prefetch limit does "rabbitmqctl list_consumers" show?

Note that if you set the prefetch count after creating the consumer, it
won't apply.

Cheers, Simon
> aaroot@aamqtest1:~$ <mailto:aaroot@aamqtest1:~$> sudo
> rabbitmqctl list_channels name prefetch_count
> messages_unacknowledged|grep 226
> 1.10.29.226:33061 <http://1.10.29.226:33061> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 0 0
> 1.10.29.226:36153 <http://1.10.29.226:36153> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 37
> 1.10.29.226:36154 <http://1.10.29.226:36154> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 41
> 1.10.29.226:36155 <http://1.10.29.226:36155> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 43
> 1.10.29.226:36156 <http://1.10.29.226:36156> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 42
> 1.10.29.226:36157 <http://1.10.29.226:36157> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 8
> 1.10.29.226:36158 <http://1.10.29.226:36158> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 29
> 1.10.29.226:36159 <http://1.10.29.226:36159> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 6
> 1.10.29.226:36160 <http://1.10.29.226:36160> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 112
> 1.10.29.226:36161 <http://1.10.29.226:36161> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 103
> 1.10.29.226:36162 <http://1.10.29.226:36162> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 6
> 1.10.29.226:36163 <http://1.10.29.226:36163> -> 1.16.51.3:5672
> <http://1.16.51.3:5672> (1) 1 50
>
> Is there any problems with my setting?
>
> --
> You received this message because you are subscribed to the
> Google Groups "rabbitmq-users" group.
> To unsubscribe from this group and stop receiving emails from
> it, send an email to rabbitmq-user...@googlegroups.com
> <mailto:rabbitmq-user...@googlegroups.com>.
> To post to this group, send email to
> rabbitm...@googlegroups.com
> <mailto:rabbitm...@googlegroups.com>.
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "rabbitmq-users" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to rabbitmq-user...@googlegroups.com
> <mailto:rabbitmq-user...@googlegroups.com>.
> To post to this group, send email to rabbitm...@googlegroups.com
> <mailto:rabbitm...@googlegroups.com>.

Dao-hui Chen

unread,
Mar 3, 2015, 10:49:00 PM3/3/15
to Simon MacMullen, rabbitm...@googlegroups.com
Hi,

I set up qos before calling amqp_basic_consume(), and after login and declare temp queue(this temp queue is not used in this situation)

output of list_consumers:

aaroot@aamqtest1:~$ sudo rabbitmqctl list_consumers -p /aa_debug
Listing consumers ...
REQ     <rab...@aamqtest2.2.25501.597>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.31445.597>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.4010.598>   DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.5508.598>   DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.9635.598>   DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.14898.598>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.20280.598>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.22302.598>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.26874.598>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.28112.598>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.6255.599>   DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.18171.803>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.22236.803>  DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.6098.804>   DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.5634.805>   DEAFULT_CONSUMER_TAG    true    1       []
REQ     <rab...@aamqtest2.2.20738.805>  DEAFULT_CONSUMER_TAG    true    1       []

        it, send an email to rabbitmq-users+unsubscribe@googlegroups.com
        <mailto:rabbitmq-users+unsub...@googlegroups.com>.

        To post to this group, send email to

        For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google
Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send

Simon MacMullen

unread,
Mar 4, 2015, 5:29:36 AM3/4/15
to rabbitm...@googlegroups.com, si...@rabbitmq.com
So that suggests the consumers are correctly getting their individual prefetch count set - but each channel still has multiple messages unacknowledged. There are two ways this could happen:

* You are creating multiple consumers per channel, possibly by mistake (what does "rabbitmqctl list_channels name consumer_count" say?)

or

* You are also invoking basic.get mixed in with basic.consume

Cheers, Simon

        To post to this group, send email to
        rabbitmq-users@googlegroups.com
        <mailto:rabbitmq-users@googlegroups.com>.
        For more options, visit https://groups.google.com/d/optout.


--
You received this message because you are subscribed to the Google
Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to rabbitmq-users+unsubscribe@googlegroups.com

Dao-hui Chen

unread,
Mar 4, 2015, 9:05:53 AM3/4/15
to Simon MacMullen, rabbitm...@googlegroups.com
The consumer count is all 1:

Listing channels ...

and I'm using rabbitmq-c's amqp_basic_consume() then amqp_consume_message(), and from my understanding
is that these 2 functions use basic.consume instead of basic.get


--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/7x4v_gFcsVs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

Dao-hui Chen

unread,
Mar 5, 2015, 9:31:55 PM3/5/15
to Simon MacMullen, rabbitm...@googlegroups.com
Found something:

My main logic about queue is :

connect ()
login()

while(1)
{
  amqp_basic_qos(prefetch_count=1)
  amqp_basic_consume(A_QUEUE_CREATED_ON_SERVER)
  amqp_basic_cancel()
  do_some_business_logic_maybe_take_some_time_say_10_sec();
}

If I remove basic_cancel and keep qos setting, the messages will keep in queue and in ready state.
If I remove qos and keep cancel here, the messages will be sent down to this channel, waiting for consumer
If I keep both cancel and qos, the message will still being sent down to channel

Is there any amqp call sequence should I notice?

Simon MacMullen

unread,
Mar 6, 2015, 5:15:53 AM3/6/15
to Dao-hui Chen, rabbitm...@googlegroups.com
On 06/03/2015 02:31, Dao-hui Chen wrote:
> while(1)
> {
> amqp_basic_qos(prefetch_count=1)
> amqp_basic_consume(A_QUEUE_CREATED_ON_SERVER)
> amqp_basic_cancel()
> do_some_business_logic_maybe_take_some_time_say_10_sec();
> }

Err, what? You should not need to repeatedly consume in a loop. I
suspect this might be the source of your problems - you consume once at
startup (or more than once if you want to listen to multiple queues) and
then the server sends you messages as they are available (respecting
prefetch_count).

If you consume / cancel each time you get a message, each new consumer
has its own prefetch count so each time you go through the loop you can
get one more unacked message.

Cheers, Simon
Reply all
Reply to author
Forward
0 new messages