[rabbitmq-discuss] flow control issues

127 views
Skip to first unread message

romary...@gmail.com

unread,
Sep 8, 2010, 5:33:03 AM9/8/10
to rabbitmq...@lists.rabbitmq.com
We are performing evaluation on rabbitmq message broker, and we
currently encounter difficulties to understand how does flow control
work.

- Our application implies 10 000 peers producing messages periodically
to a unique queue. This queue is listen asynchronously by another peer.
- All peer are written in Java, using amqp-client-1.8.1.
- The production rate of a single peer is 4 messages / hour
- We can simulate a time-consuming task in the consumer callback,
simulating more or less fast consumer.
- we are using SSL certificate on the broker side to allow the peer to
authenticate the broker.
- we have noticed that the use of SSL as dramatic incidence on the
memory occupied by Rabbitmq process

- we give us the possibility to add a second consumer to load-balance
the consumption of messages
- we are using prefetch windows of 1 message to enable credit-based
flow control in this case

- we have settled several monitoring indicator on the broker side :
- virtual memory occupied by rabbitmq process
- cpu load
- queue depth
- disk occupation

- our test scenario is as follow:
- during the 5 first hours, all peers join in the party (prods and
cons)
- after 5 hours, producers stop publishing messages
- the test goes on for a configurable duration to allow the consumer
to finish emptying the queue

during long running tests, we have encountered strange behaviour due
to flow control :

The queue depth starts to increase linearly for about 2 hours, these
is coherent since the message throughput of the single consumer
is not enough to absorb message ingress. Memory occupation grow faster
as well, until the memory watermark is reached on the broker side.

From that point, the producers are indeed paused, as flow control
request has been issued by the broker, but the consumer seems to be
blocked
as well. The queue level is flatten at its top value until the end of
the test, even when memory occupation lowered under the threshold.

By registering the FlowListener callback, we have noticed that not all
of the producers are notified all the time the alarm handler is set.
Does this mean that the broker applies some heuristic to try not to
block every body every time ?
Or does it mean that some of the channels have been somehow
blacklisted by the broker ?

Could anybody explain how the blocking of consumer is assumed to be
implemented ?
Does the call of Channel.publish() is someHow blocking the connection
Thread ?
How come that the consumer connection is also blocked ?
Does the implementation of FlowListener interface may help to handle
flow control request ?
(I thought at first glance that the flow control should be implemented
by hand using this interface,
but looking at this http://hopper.squarespace.com/blog/2008/11/9/flow-control-in-rabbitmq.html
after all, it seems that it is not the case anyway)

Best regards,

Romary.
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq...@lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Romary Kremer

unread,
Sep 8, 2010, 6:49:14 AM9/8/10
to rabbitmq...@lists.rabbitmq.com
One more thing I forgot to mention :

- The previous situation have been encountered with rabbitMQ 1.8.1
broker.

I've started playing a bit with the latest release 2.0.0 and I m
affraid that it looks like their are some regression or at least some
semantic updates.

I've just modified the provided MultiCast main sample of the java
client library to add a FlowListener interface on both Producer and
Consumer,
and registering the flow listener on their respective Channel.

It looks like any listener is called back when the alarm handler is
set or cleared, while the producers are still paused / resumed
like their are to be.

Does it mean some evolution in the flow control inner mechanism of the
message broker or a regression on the Java client stack ?

Furthermore, what about the deployment of the client stack on the
Maven repository ?

Le 8 sept. 10 à 11:33, romary...@gmail.com a écrit :

Marek Majkowski

unread,
Sep 10, 2010, 9:06:56 AM9/10/10
to Romary Kremer, rabbitmq...@lists.rabbitmq.com
Romary,

First, thank a lot for the feedback. For example the information about
SSL memory
usage is indeed very interesting. (if that is a big problem to you,
you may always
fall back to the old technique of using stunnel)

The flow control was heavily modified between 1.8.1 and 2.0.0. In summary:
- 1.8.1 - we have send Channel.flow AMQP message to everyone once
rabbit reached memory limit
- 2.0.0 - once we reach memory limit, the connections from which we hear
publishes are stopped temporarily. We stop receiving bytes from tcp sockets.
That 'stop' shouldn't take too long, as data should be swapped out to disk
and memory pressure will drop pretty quickly.


On Wed, Sep 8, 2010 at 11:49, Romary Kremer <romary...@gmail.com> wrote:
> I've started playing a bit with the latest release 2.0.0 and I m affraid
> that it looks like their are some regression or at least some semantic
> updates.

It's best if you upgraded both server and client library. Do you have any
particular problems? A lot was changed in 2.0.0 but we think it's fully
functional. If you found something that blocks you to migrate, you
could report a bug.

> It looks like any listener is called back  when the alarm handler is set or
> cleared, while the producers are still paused / resumed
> like their are to be.

Interesting. Maybe we have a race there? Or maybe you're blocking
the main java client thread? (nothing blocking should be done from
the main thread)

>> during long running tests, we have encountered strange behaviour due to
>> flow control :
>>
>> The queue depth starts to increase linearly for about 2 hours, these is
>> coherent since the message throughput of the single consumer
>> is not enough to absorb message ingress. Memory occupation grow faster as
>> well, until the memory watermark is reached on the broker side.

Are you sure your consumer is ACK-ing the messages it received?

>> From that point, the producers are indeed paused, as flow control request
>> has been issued by the broker, but the consumer seems to be blocked
>> as well. The queue level is flatten at its top value until the end of the
>> test, even when memory occupation lowered under the threshold.

That's how 1.8.1 behaves. In 2.0.0 we introduced swapping out big queues
to disk, so the memory usage shouldn't be dependent on a queue size.

>> By registering the FlowListener callback, we have noticed that not all of
>> the producers are notified all the time the alarm handler is set.
>> Does this mean that the broker applies some heuristic to try not to block
>> every body every time ?
>> Or does it mean that some of the channels have been somehow blacklisted by
>> the broker ?

No, in 1.8.1 broker should send 'channel.flow' to all the channels.

>> Could anybody explain how the blocking of consumer is assumed to be
>> implemented ?

The best description is probably here:
http://www.rabbitmq.com/extensions.html#memsup

But it covers 2.0.0. I'd suggest an upgrade to 2.0.0 and monitoring
not only queue size but also number of unacknowledged messages
('Msg unack' in status plugin). This number should be near zero.

Cheers,
Marek Majkowski

romary...@gmail.com

unread,
Sep 14, 2010, 4:07:20 AM9/14/10
to rabbitmq...@lists.rabbitmq.com
Thanks Marek for your reply.

Le 10 sept. 10 à 15:06, Marek Majkowski a écrit :

> Romary,
>
> First, thank a lot for the feedback. For example the information about
> SSL memory
> usage is indeed very interesting. (if that is a big problem to you,
> you may always
> fall back to the old technique of using stunnel)

Stunnel is not considered for now, We would rather ear your opinion
and knowledge
of potential memory leak due to the use SSL (maybe from the Erlang
stack itself).
It seems that memory is still growing even when the broker is running
no activity
but connections. We ran a test in which we just open 10 000
connections and do
noting for 2 hours but observing the memory occupation on the broker
side to confirm it.

> The flow control was heavily modified between 1.8.1 and 2.0.0. In
> summary:
> - 1.8.1 - we have send Channel.flow AMQP message to everyone once
> rabbit reached memory limit
> - 2.0.0 - once we reach memory limit, the connections from which we
> hear
> publishes are stopped temporarily. We stop receiving bytes from
> tcp sockets.
> That 'stop' shouldn't take too long, as data should be swapped out
> to disk
> and memory pressure will drop pretty quickly.

Do you mean that in 2.0.0 the Channel.flow AMQP message is no longer
sent to
the producer that are stopped temporarily ? So that would explain why
1) Channel.publish() can be blocking on the client side when the
broker stop
reading from the socket !

2) FlowListener.handleFlow() is no longer invoked on the registered
listener when


the alarm handler is set or cleared

Are my deduction wright ?

Do you have any figures to quantify "should, not take too long" ? Are
their some
test reports available about that major evolution ?

>
>
> On Wed, Sep 8, 2010 at 11:49, Romary Kremer
> <romary...@gmail.com> wrote:
>> I've started playing a bit with the latest release 2.0.0 and I m
>> affraid
>> that it looks like their are some regression or at least some
>> semantic
>> updates.
>
> It's best if you upgraded both server and client library. Do you
> have any
> particular problems? A lot was changed in 2.0.0 but we think it's
> fully
> functional. If you found something that blocks you to migrate, you
> could report a bug.

Sorry If I wasn't clear on the previous post ,we are already in 2.0.0
for both broker and
client library.

>
>> It looks like any listener is called back when the alarm handler
>> is set or
>> cleared, while the producers are still paused / resumed
>> like their are to be.
>
> Interesting. Maybe we have a race there? Or maybe you're blocking
> the main java client thread? (nothing blocking should be done from
> the main thread)

I am quite sure I am not blocking the main thread, neither the
Connection Thread. All
the message-related logic is in a particular thread (Some kind of
ProducerGroup
pool of threads actually).
Consumer call back are running within the Connection thread if I refer
to the Javadoc !

The same code using the library version 1.8.1, The callback where
invoked when
alarm handler is set or cleared anyway.

>
>>> during long running tests, we have encountered strange behaviour
>>> due to
>>> flow control :
>>>
>>> The queue depth starts to increase linearly for about 2 hours,
>>> these is
>>> coherent since the message throughput of the single consumer
>>> is not enough to absorb message ingress. Memory occupation grow
>>> faster as
>>> well, until the memory watermark is reached on the broker side.
>
> Are you sure your consumer is ACK-ing the messages it received?

The Consumer call back does ACK messages upon reception, one at a time
(multiple == false).
Does the basic.ack() method is eligible to be blocked as well as
publish() upon flow control ?

>
>>> From that point, the producers are indeed paused, as flow control
>>> request
>>> has been issued by the broker, but the consumer seems to be blocked
>>> as well. The queue level is flatten at its top value until the end
>>> of the
>>> test, even when memory occupation lowered under the threshold.
>
> That's how 1.8.1 behaves. In 2.0.0 we introduced swapping out big
> queues
> to disk, so the memory usage shouldn't be dependent on a queue size.

Good new, because we had identified 2 scenarios in wich memory-based
channel flow
was triggered :

- the use of SSL
- the use of larger message (4kb, same ingress)
Now I hope that the message size will not be that much determinant for
flow controll,as soon
as consumers are able to handle these message regularly.

>
>>> By registering the FlowListener callback, we have noticed that not
>>> all of
>>> the producers are notified all the time the alarm handler is set.
>>> Does this mean that the broker applies some heuristic to try not
>>> to block
>>> every body every time ?
>>> Or does it mean that some of the channels have been somehow
>>> blacklisted by
>>> the broker ?
>
> No, in 1.8.1 broker should send 'channel.flow' to all the channels.

Strange so, their must be some thing very weird.


>
>>> Could anybody explain how the blocking of consumer is assumed to be
>>> implemented ?
>
> The best description is probably here:
> http://www.rabbitmq.com/extensions.html#memsup
>
> But it covers 2.0.0. I'd suggest an upgrade to 2.0.0 and monitoring
> not only queue size but also number of unacknowledged messages
> ('Msg unack' in status plugin). This number should be near zero.
>

We are already with 2.0.0.
Where can I find some doc about the Status plugin anyway ?

Cheers, Romary.

romary...@gmail.com

unread,
Sep 15, 2010, 9:45:50 AM9/15/10
to rabbitmq...@lists.rabbitmq.com
Hi,
since I can not find out the reasons of our problems yet, I've decided to perform some more basics tests.

Here are the latest results I' ve gathered by doing some test with the MultiCastMain program from java client 2.0.0

First of all, let me introduce the modifications I have performed on the MultiCastMain to allow more flexibility in the configuration :
- introduction of optional parameter "-D" to toggle Durability of exchange and queue ON (missing value means OFF)
- introduction of optional parameter "-A" to toggle autodelete of the queue ON (missing value means OFF)
- introduction of optional parameter "-X" to toggle exclusiveness on the queue ON (missing value means OFF)
- introduction of optional parameter "-Q" to give a name to a queue : useful to fetch messages from the same queue between several executions.
- introduction of optional parameter "-K" to give the same routing key (for producer) and binding key (for consumer) between several executions.

Here after is a snippet of /etc/rabbitmq/rabbitmq.config :
[
   {rabbit, [
     {vm_memory_high_watermark, 0.05},
     {ssl_listeners, [{"0.0.0.0",5671}]},
     {ssl_options, [{cacertfile,"/var/lib/rabbitmq/ssl/certificate-authority/schneider-ca.pem"},
                   {certfile,"/var/lib/rabbitmq/ssl/broker-certs/broker-cert.pem"},
                     {keyfile,"/var/lib/rabbitmq/ssl/broker-certs/broker-key.pem"},
                     {verify,verify_none},
                     {fail_if_no_peer_cert,false}]}
   ]}
].

To step in the memory limitations faster, I 've decreased the memory watermark threshold to .05 (5%) 
This corresponds to 50 MB on the host we are using for this test as shown on the rabbit.log file upon startup of broker :

=INFO REPORT==== 15-Sep-2010::12:36:15 ===
Limiting to approx 65435 file handles (58889 sockets)

=INFO REPORT==== 15-Sep-2010::12:36:15 ===
Memory limit set to 50MB.

Finally the broker status description (rabbitmqctl status)

[{running_applications,[{rabbit_status,"RabbitMQ Status Page","0.01"},
                        {rabbit,"RabbitMQ","2.0.0"},
                        {ssl,"Erlang/OTP SSL application","3.10.7"},
                         {public_key,"Public key infrastructure","0.4"},
                         {mnesia,"MNESIA  CXC 138 12","4.4.12"},
                       {os_mon,"CPO  CXC 138 46","2.2.4"},
                         {sasl,"SASL  CXC 138 11","2.1.8"},
                         {rabbit_mochiweb,"RabbitMQ Mochiweb Embedding","0.01"},
                         {mochiweb,"MochiMedia Web Server","1.3"},
                         {crypto,"CRYPTO version 1","1.6.3"},
                         {inets,"INETS  CXC 138 49","5.2"},
                         {stdlib,"ERTS  CXC 138 10","1.16.4"},
                         {kernel,"ERTS  CXC 138 10","2.13.4"}]},
 {nodes,[{disc,['rabbit@mystic-buntu']}]},
  {running_nodes,['rabbit@mystic-buntu']}]

Thanks to the modifications of the MultiCastMain, I've run a 2 phases producer / consumer test as described bellow :

phase 0 : initialization of exchange and queue for the test : 
MultiCastMain -h <host> -p 5672 -s 1000 -n 1 -x 0 -y 1 -e testX -Q testQ -K testK -D -z 10
this launches a single consumer creating the exchange testX and queue testQ with durability ON.
The queue is bound to the exchange using key testK
The test is told to end after 10 seconds.
phase 1 - launch a producer alone and watch memory occupation + connections status and queue depth (via rabbit_status plugin for instance)
MultiCastMain -h <host> -p 5672 -s 1000 -n 1 -x 1 -y 0 -e testX -Q testQ -K testK -D

This run a producer that will publish 1kb messages to the testX exchange with routing key testK.
These messages will be queued inside testQ.

The producer is blocked after only few seconds running as the memory threshold is reached. The queue depth indicated 12 011 messages.
(You can scroll to the end of this mail to see an extract of broker log to illustrate the alarm_hanlder notifications.)

What do I mean by "the producer is blocked" :
- rabbit_status plugin shows the connection in status "bocked"
- rabbit_status plugin shows that the depth of queue testQ remains constant
- The trace from MultiCastMain does not displays new stats of sending rate after a while
- The last lines in the broker log are :
=INFO REPORT==== 15-Sep-2010::14:53:44 === vm_memory_high_watermark set. Memory used:53650536 allowed:52556595 =INFO REPORT==== 15-Sep-2010::14:53:44 === alarm_handler: {set,{vm_memory_high_watermark,[]}}

From that point, the producer is blocked without any possible recovery. As the flow control is designed in v 2.0.0, I would have expected the producer 
to be released thanks to sawp of messages to the disk. Anyway, my second assumption was that a consumer is needed to release pressure on the queue,
so I decided to keep the producer f running, and start phase 2 as follow :
phase 2 - launch a consumer alone and watch memory occupation + connections status and queue depth (via rabbit_status plugin for instance)

MultiCastMain -h <host> -p 5672 -s 1000 -n 1 -x 0 -y 1 -e testX -Q testQ -K testK -D
This runs a consumer that will get message out the queue
testQ.

The messages from testQ are removed quite fast too, but the memory occupation does not seem to be impacted at all, as the producer is still blocked.
From the rabbit_status plugin, the status of the connections remains :
- "blocked" for the producer
- "blocking"
for the consumer

And memory usage is still over the threshold : memory (used/available) = 53MB / 50MB

The only way I found out to allow memory to fall back bellow the threshold is by killing manually the connection of the producer.
Killing the producer process on the client side does not do indeed :

If I kill the producer process, the connection seems to disappear but the memory remains over the threshold.
The rabbit_status plugin seem to fail after this because I got a strange error report in the broker log. (see the end of this long mail !)
By closing the connection on the broker side, the memory has (not always) fallen below the threshold, back to normal (32 MB / 50 MB).
 
I think this behaviour is not expected and maybe that could be due to a bug somewhere, since it si fully reproductible on my configuration.
The fact that the memory occupation never falls below the threshold after message are removed from the queue is particularly strange and
unwilling from my point of view. It think that this simple test can points out an issue that would explains the problems that I mentioned in the
previous messages, but not sure about it.

I am sorry for this quite long message, but I thought that the more details you get, the better.

best regards,

Romary.
The following is an extract from the broker log during phase 1

=INFO REPORT==== 15-Sep-2010::14:53:24 === accepted TCP connection on 0.0.0.0:5672 from 192.168.0.71:1845 =INFO REPORT==== 15-Sep-2010::14:53:24 === starting TCP connection <0.16120.0> from 192.168.0.71:1845 =INFO REPORT==== 15-Sep-2010::14:53:42 === vm_memory_high_watermark set. Memory used:52632808 allowed:52556595 =INFO REPORT==== 15-Sep-2010::14:53:42 === alarm_handler: {set,{vm_memory_high_watermark,[]}} =INFO REPORT==== 15-Sep-2010::14:53:43 === vm_memory_high_watermark clear. Memory used:52355320 allowed:52556595 =INFO REPORT==== 15-Sep-2010::14:53:43 ===
alarm_handler: {clear,vm_memory_high_watermark}
=INFO REPORT==== 15-Sep-2010::14:53:44 ===
vm_memory_high_watermark set. Memory used:53650536 allowed:52556595 =INFO REPORT==== 15-Sep-2010::14:53:44 === alarm_handler: {set,{vm_memory_high_watermark,[]}}

The following is an extract from the broker log during phase 2
=INFO REPORT==== 15-Sep-2010::15:21:38 ===
accepted TCP connection on 0.0.0.0:5672 from 192.168.0.71:1880

=INFO REPORT==== 15-Sep-2010::15:21:38 ===
starting TCP connection <0.2101.1> from 192.168.0.71:1880

The following is the error report after shutting down the producer process (client side)

ERROR REPORT==== 15-Sep-2010::15:37:53 === ** Generic server rabbit_status_web terminating ** Last message in was get_context ** When Server state == {state,1284557846311,"Wed, 15 Sep 2010 11:37:26 GMT", [<<"0.0.0.0:5672 ">>], [[{pid,<<"<0.16120.0>">>}, {address,<<"192.168.0.58">>}, {port,5672}, {peer_address,<<"192.168.0.71">>}, {peer_port,1845}, {recv_oct,12696041}, {recv_cnt,72079}, {send_oct,357}, {send_cnt,5}, {send_pend,0}, {state,<<"blocked">>}, {channels,1}, {user,<<"guest">>}, {vhost,<<"/">>}, {timeout,0}, {frame_max,131072}]], [[{vhost,<<"/">>}, {name,<<"testQ">>}, {durable,<<"true">>}, {auto_delete,<<"false">>}, {arguments,<<"[]">>}, {pid,<<"<0.12150.0>">>}, {messages_ready,0}, {messages_unacknowledged,0}, {messages_uncommitted,<<"undefined">>}, {messages,0}, {acks_uncommitted,<<"undefined">>}, {consumers,0}, {transactions,<<"undefined">>}, {memory,6171332}]], 33,65535,55843456,52556595.0,161,1048576} ** Reason for termination == ** {{badmatch,{error,enotconn}},{gen_server,call,[<0.16120.0>,info,infinity]}} =ERROR REPORT==== 15-Sep-2010::15:37:53 === ** Generic server <0.16124.0> terminating ** Last message in was {'EXIT',<0.16123.0>,{badmatch,{error,enotconn}}} ** When Server state == {ch,running,1,<0.16120.0>,<0.16122.0>,undefined,none, {set,0,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[], [],[]}, {{[],[],[],[],[],[],[],[],[],[],[],[],[],[], [],[]}}}, 1, {[],[]}, {[],[]}, <<"guest">>,<<"/">>,<<>>, {dict,0,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[], [],[]}, {{[],[],[],[],[],[],[],[],[],[],[],[],[],[], [],[]}}}, {dict,0,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[], [],[]}, {{[],[],[],[],[],[],[],[],[],[],[],[],[],[], [],[]}}}, <0.16121.0>, {state,none,undefined}} ** Reason for termination == ** {badmatch,{error,enotconn}}



Le 10 sept. 10 à 15:06, Marek Majkowski a écrit :

Marek Majkowski

unread,
Sep 15, 2010, 10:11:36 AM9/15/10
to romary...@gmail.com, rabbitmq...@lists.rabbitmq.com
On Tue, Sep 14, 2010 at 09:07, romary...@gmail.com
<romary...@gmail.com> wrote:
>> The flow control was heavily modified between 1.8.1 and 2.0.0. In summary:
>> - 1.8.1 - we have send Channel.flow AMQP message to everyone once
>>  rabbit reached memory limit
>> - 2.0.0 - once we reach memory limit, the connections from which we hear
>>  publishes are stopped temporarily. We stop receiving bytes from tcp
>> sockets.
>>  That 'stop' shouldn't take too long, as data should be swapped out to
>> disk
>>  and memory pressure will drop pretty quickly.
>
> Do you mean that in 2.0.0 the Channel.flow AMQP message is no longer sent to
> the producer that are stopped temporarily ? So that would explain why
>        1) Channel.publish() can be blocking on the client side when the
> broker stop
>        reading from the socket !
>
>        2) FlowListener.handleFlow() is no longer invoked on the registered
> listener when
>        the alarm handler is set or cleared
> Are my deduction wright ?

Yes. You will never hear "FlowListener.handleFlow()" and it may be possible for
channel.publish to block (though I would need to consult the sources
to be sure).

> Do you have any figures to quantify "should, not take too long" ? Are their
> some
> test reports available about that major evolution ?

That's the question we really avoided :) Oh, well. No, we haven't done any
'real' tests, it's only based on our intuition and experience. In most
cases the blocking goes away pretty quickly - after 30 seconds usually,
about two minutes sometimes.

But it is possible to create a very pessimistic environment in which the
memory usage will not drop - and the connection could be stuck for a long time.
(though it's pretty unlikely).

> Sorry If I wasn't clear on the previous post ,we are already in 2.0.0 for
> both broker and
> client library.

Good.

Well, under current implementation of flow control - yes. As it's whole
tcp/ip connection that gets blocked. It will affect any commands, including
basic.ack.

What we usually propose is to use different tcp/ip connection for receiving
and different for publishing. On memory pressure we only block the publishers.
Using separate connection only for receiving you may be sure it will
never be blocked.

I'm afraid the old blog post is the only source:
http://www.lshift.net/blog/2009/11/30/introducing-rabbitmq-status-plugin

romary...@gmail.com

unread,
Sep 15, 2010, 2:11:47 PM9/15/10
to rabbitmq...@lists.rabbitmq.com
Le 15 sept. 10 à 16:11, Marek Majkowski a écrit :

On Tue, Sep 14, 2010 at 09:07, romary...@gmail.com
<romary...@gmail.com> wrote:
The flow control was heavily modified between 1.8.1 and 2.0.0. In summary:
- 1.8.1 - we have send Channel.flow AMQP message to everyone once
 rabbit reached memory limit
- 2.0.0 - once we reach memory limit, the connections from which we hear
 publishes are stopped temporarily. We stop receiving bytes from tcp
sockets.
 That 'stop' shouldn't take too long, as data should be swapped out to
disk
 and memory pressure will drop pretty quickly.

Do you mean that in 2.0.0 the Channel.flow AMQP message is no longer sent to
the producer that are stopped temporarily ? So that would explain why
       1) Channel.publish() can be blocking on the client side when the
broker stop
       reading from the socket !

       2) FlowListener.handleFlow() is no longer invoked on the registered
listener when
       the alarm handler is set or cleared
Are my deduction wright ?

Yes. You will never hear "FlowListener.handleFlow()" and it may be possible for
channel.publish to block (though I would need to consult the sources
to be sure).

It seems to me that FlowListener interface is likely to be deprecated so, does'nt it ?
Does not really matter for us anyway, cause we where on wrong idea using that.
Does this new implementation keep the broker on track for compliance with specification then ?


Do you have any figures to quantify "should, not take too long" ? Are their
some
test reports available about that major evolution ?

That's the question we really avoided :) Oh, well. No, we haven't done any
'real' tests, it's only based on our intuition and experience. In most
cases the blocking goes away pretty quickly - after 30 seconds usually,
about two minutes sometimes.

This would be acceptable for our needs, only if we can somehow guarantee that's an upper boundary ! 

But it is possible to create a very pessimistic environment in which the
memory usage will not drop - and the connection could be stuck for a long time.
(though it's pretty unlikely).

... Not that much unlikely, considering my little playing with the MultiCastMain sample (see my previous reply about it for details).
I get 100 % times blocked connection.
What would be, based on your knowledge and your intuition, "a very pessimistic environment in which the memory usage will not drop" ?

I think that the experimentation I've done on the MultiCastMain is maybe a beginning of an answer for that question, although I would
never have considered that a single producer could have such power to flood the broker.
Weren't Channel design for that ? In our environment, we have (naively ?) considered the use of Channel to 
separate the message production from the consumption.
Since we are targeting 10 000 peers doing both production and consumption, the fact of multiplying the number of
connections by 2 is not negligible at all, considering scalability.
Moreover, as I reported later on, we use SSL to authenticate the broker, and we are still unclear about memory leaks
induce by SSL connections. Doubling the number of connections will not be negligible at all considering memory occupation either.
In conclusion, we are not likely to implement our peers using 2 connections for the same broker.
What would you recommend to us then ? And could you give us a better understanding on the use case of channels ?
Take it easy, it was really straight forward to install it after all. for those who would experiment some issues just
go in /usr/lib/rabbitmq/lib/rabbitmq_server-2.0.0/plugins then get 
-mochiweb-2.0.0.ez
-rabbitmq-mochiweb-2.0.0.ez 
-rabbit_status-2.0.0.ez 
from here, and voila !!

B.R,

Romary.

Marek Majkowski

unread,
Sep 16, 2010, 6:31:28 AM9/16/10
to romary...@gmail.com, rabbitmq...@lists.rabbitmq.com
On Wed, Sep 15, 2010 at 14:45, romary...@gmail.com
<romary...@gmail.com> wrote:
> since I can not find out the reasons of our problems yet, I've decided to
> perform some more basics tests.

Hurray!

> To step in the memory limitations faster, I 've decreased the memory
> watermark threshold to .05 (5%)
> This corresponds to 50 MB on the host we are using for this test as shown on
> the rabbit.log file upon startup of broker :
> =INFO REPORT==== 15-Sep-2010::12:36:15 ===

> Memory limit set to 50MB.

Hooa. 50 megs is _really_ small. Probably more than half of it is
taken by Erlang
internals, really not much is left for Rabbit!
It's hard to say what is the minimal sensible memory allowance, but I'd guess
not less than 150 megs. Probably even 250 megs if you do anything non-trivial.

In your 50megs setup you may run into situation when once the memory
alarm goes on it _never_ goes off. That would mean you won't be able
to publish any message.


> Finally the broker status description (rabbitmqctl status)
> [{running_applications,[{rabbit_status,"RabbitMQ Status Page","0.01"},
>                         {rabbit,"RabbitMQ","2.0.0"},
>                         {ssl,"Erlang/OTP SSL application","3.10.7"},
>                          {public_key,"Public key infrastructure","0.4"},
>                          {mnesia,"MNESIA  CXC 138 12","4.4.12"},
>                        {os_mon,"CPO  CXC 138 46","2.2.4"},
>                          {sasl,"SASL  CXC 138 11","2.1.8"},
>                          {rabbit_mochiweb,"RabbitMQ Mochiweb
> Embedding","0.01"},
>                          {mochiweb,"MochiMedia Web Server","1.3"},
>                          {crypto,"CRYPTO version 1","1.6.3"},
>                          {inets,"INETS  CXC 138 49","5.2"},
>                          {stdlib,"ERTS  CXC 138 10","1.16.4"},
>                          {kernel,"ERTS  CXC 138 10","2.13.4"}]},
>  {nodes,[{disc,['rabbit@mystic-buntu']}]},
>   {running_nodes,['rabbit@mystic-buntu']}]

Ok. For testing and benchmarking I'd recommend to remove status plugin.
Status plugin (if you look at the results) constantly queries Queues, which
may in some cases make it less likely to GC/hibernate them.

The new management plugin is better and shouldn't affect the GC that much.
(though for benchmarking it's still better to disalbe all the pluigins)

> - The last lines in the broker log are :
> =INFO REPORT==== 15-Sep-2010::14:53:44 === vm_memory_high_watermark set.
> Memory used:53650536 allowed:52556595 =INFO REPORT==== 15-Sep-2010::14:53:44
> === alarm_handler: {set,{vm_memory_high_watermark,[]}}
> From that point, the producer is blocked without any possible recovery. As
> the flow control is designed in v 2.0.0, I would have expected the producer
> to be released thanks to sawp of messages to the disk.

Yes it will! But only if it has enough memory to breathe!

What has happened is that internal Erlang stuff took most of the memory,
and even if we write everything to disk we won't be able to keep the
memory usage
under the threshold.

> I think this behaviour is not expected and maybe that could be due to a bug
> somewhere, since it si fully reproductible on my configuration.
> The fact that the memory occupation never falls below the threshold after
> message are removed from the queue is particularly strange and
> unwilling from my point of view. It think that this simple test can points
> out an issue that would explains the problems that I mentioned in the
> previous messages, but not sure about it.
> I am sorry for this quite long message, but I thought that the more details
> you get, the better.

Few suggestions:
- Give rabbit more memory. How much memory? That depends on your setup.
If you run 10k of connections you need enough memory to handle those
connections + some memory to keep messages and queues.
If the connections will eat up all the memory - rabbit won't allow
you to publish.
- Disable status plugin (you can try to use management plugin).
- Try new Erlang, R14B. They claim to have fixed some memory leaks with SSL,
so it may help.

Cheers!
Marek

Marek Majkowski

unread,
Sep 16, 2010, 8:49:39 AM9/16/10
to romary...@gmail.com, rabbitmq...@lists.rabbitmq.com
On Wed, Sep 15, 2010 at 19:11, romary...@gmail.com
<romary...@gmail.com> wrote:
> Yes. You will never hear "FlowListener.handleFlow()" and it may be possible
> for
> channel.publish to block (though I would need to consult the sources
> to be sure).
>
> It seems to me that FlowListener interface is likely to be deprecated so,
> does'nt it ?

It's more complex than that, but yes, you won't hear anything on that interface.

> Does not really matter for us anyway, cause we where on wrong idea using
> that.
> Does this new implementation keep the broker on track for compliance with
> specification then ?

Yes, the spec doesn't force us to send channel.flow. We implemented that
for a while but realized that it doesn't solve our problems.

> This would be acceptable for our needs, only if we can somehow guarantee
> that's an upper boundary !

Optimistically, the upper bouadary is not more than your memory usage
divided by the disk throughput.

> But it is possible to create a very pessimistic environment in which the
> memory usage will not drop - and the connection could be stuck for a long
> time.
> (though it's pretty unlikely).

> ... Not that much unlikely, considering my little playing with the
> MultiCastMain sample (see my previous reply about it for details).
> I get 100 % times blocked connection.
> What would be, based on your knowledge and your intuition, "a very
> pessimistic environment in which the memory usage will not drop" ?
> I think that the experimentation I've done on the MultiCastMain is maybe a
> beginning of an answer for that question, although I would
> never have considered that a single producer could have such power to flood
> the broker.

Okay. The memory can stay high due to a lot of reasons. If your metadata
that rabbit never releases is using more memory than threshold -
rabbit will just get stuck.

Next thing. remember, we don't control how erlang eats memory - and it
has pretty
complex GC and memory allocation mechanisms.

If you think that you have enough memory for all the connections,
queues, exchanges and bindings. And some memory for the messages.
And you still hit the limit when you get stuck - feel free to tune
Erlang GC internals:
http://www.erlang.org/doc/man/erts_alloc.html

> Weren't Channel design for that ? In our environment, we have (naively ?)
> considered the use of Channel to
> separate the message production from the consumption.
> Since we are targeting 10 000 peers doing both production and consumption,
> the fact of multiplying the number of
> connections by 2 is not negligible at all, considering scalability.
> Moreover, as I reported later on, we use SSL to authenticate the broker, and
> we are still unclear about memory leaks
> induce by SSL connections. Doubling the number of connections will not be
> negligible at all considering memory occupation either.
> In conclusion, we are not likely to implement our peers using 2 connections
> for the same broker.
> What would you recommend to us then ? And could you give us a better
> understanding on the use case of channels ?

Yes, channels were designed exactly for that. On the other hand, AMQP
has few pretty serious issues. For example when you open channel
you're free to publish a message. And broker can't 'refuse' accepting
a message. Channel.flow can be sent from the broker to the client
but *after* channel is opened. So there is a window in which
you just can publish (after channel.open before channel.flow). Sorry, there
just isn't other way of forbidding publishes than by stopping the whole
connection. I also don't like it, but it's the only way.


Cheers,
Marek

romary...@gmail.com

unread,
Sep 17, 2010, 10:32:09 AM9/17/10
to rabbitmq...@lists.rabbitmq.com
Their are still some point I don't understand quite well with flow
control.

As Marek told, in version previous to 2.0.0, the broker sends

Channel.flow AMQP message to everyone once

rabbit reached memory limit. However, I still dont' understand how it
came to block the production with that version.

In first, I thought that the client application should implement its
own pause/resume logic, and that FlowListener interface would be the
way to do that. I realized only later that the producer was paused as
well without the use of FlowListener, for instance using the
MultiCastMain sample in Java client 1.7.2.

Can you give us hints about how the production was indeed paused,
other by stopping reading from the socket ?
Does the broker shut the connection or only the channel if a content-
bearing message is sent after the flow.control ?

What is the meaning of status "Blocked" and "Blocking" that we can
gather by listing connections ! I often noticed that
the connection of producer in status "blocked" lead the connection of
the consumer to be "Blocking".

Sharing the same connection for producer and consumer, by the use of
Channel may not be safe, that is very sad !

Best regards,

Romary.

Le 16 sept. 10 à 14:49, Marek Majkowski a écrit :

Matthew Sackman

unread,
Sep 18, 2010, 6:30:16 PM9/18/10
to rabbitmq...@lists.rabbitmq.com
On Fri, Sep 17, 2010 at 04:32:09PM +0200, romary...@gmail.com wrote:
> Can you give us hints about how the production was indeed paused,
> other by stopping reading from the socket ?

The client itself understands the channel.flow messages that come from
the broker. Thus the client then itself blocks any publishes that the
application does, waiting for permission from the broker to resume
publishing.

> Does the broker shut the connection or only the channel if a

> content-bearing message is sent after the flow.control ?

I can't remember. I would think it should be channel only on the grounds
that other channels that may be consuming messages have done nothing
wrong. However, for that to happen is basically a client error: the
client itself should understand the channel flow messages and make
sending content-bearing messages a blocking operation at that point.

> What is the meaning of status "Blocked" and "Blocking" that we can
> gather by listing connections ! I often noticed that
> the connection of producer in status "blocked" lead the connection
> of the consumer to be "Blocking".

That will mean that channel.flow{active=false} has been sent to the
client, and then that channel.flow_ok has been received back by the
server.

> Sharing the same connection for producer and consumer, by the use of
> Channel may not be safe, that is very sad !

Well, it's fine provided you have a client that works properly and you
understand that at times, publishing a content bearing message may be a
blocking operation, and code accordingly.

Matthew

mabrek

unread,
Sep 24, 2010, 11:01:30 AM9/24/10
to rabbitmq...@lists.rabbitmq.com
On Fri, Sep 10, 2010 at 5:06 PM, Marek Majkowski <maj...@gmail.com> wrote:
> The flow control was heavily modified between 1.8.1 and 2.0.0. In summary:
>  - 1.8.1 - we have send Channel.flow AMQP message to everyone once
>   rabbit reached memory limit
>  - 2.0.0 - once we reach memory limit, the connections from which we hear
>   publishes are stopped temporarily. We stop receiving bytes from tcp sockets.
>   That 'stop' shouldn't take too long, as data should be swapped out to disk
>   and memory pressure will drop pretty quickly.

We use channel.basicPublish (java client) from web request handler and
need to reply quickly. Hanging basicPublish will ruin our response
time. It would be better to specify timeout on publishing or to throw
an exception in case of server reaching memory limit.

Regards,
Anton Lebedevich.

mabrek

unread,
Sep 24, 2010, 11:12:10 AM9/24/10
to Marek Majkowski, rabbitmq...@lists.rabbitmq.com
On Thu, Sep 16, 2010 at 4:49 PM, Marek Majkowski <maj...@gmail.com> wrote:
> Yes, channels were designed exactly for that. On the other hand, AMQP
> has few pretty serious issues. For example when you open channel
> you're free to publish a message. And broker can't 'refuse' accepting
> a message. Channel.flow can be sent from the broker to the client
> but *after* channel is opened. So there is a window in which
> you just can publish (after channel.open before channel.flow). Sorry, there
> just isn't other way of forbidding publishes than by stopping the whole
> connection. I also don't like it, but it's the only way.

We do need to check stopped state of connection before sending messages.
Please leave some hints to client to decide if it can send messages
without hanging.
Something like 'boolean isPublishingAllowed()' in Channel or timeout
parameter in basicPublish would be fine.

Regards,
Anton Lebedevich.

Reply all
Reply to author
Forward
0 new messages