- Our application implies 10 000 peers producing messages periodically
to a unique queue. This queue is listen asynchronously by another peer.
- All peer are written in Java, using amqp-client-1.8.1.
- The production rate of a single peer is 4 messages / hour
- We can simulate a time-consuming task in the consumer callback,
simulating more or less fast consumer.
- we are using SSL certificate on the broker side to allow the peer to
authenticate the broker.
- we have noticed that the use of SSL as dramatic incidence on the
memory occupied by Rabbitmq process
- we give us the possibility to add a second consumer to load-balance
the consumption of messages
- we are using prefetch windows of 1 message to enable credit-based
flow control in this case
- we have settled several monitoring indicator on the broker side :
- virtual memory occupied by rabbitmq process
- cpu load
- queue depth
- disk occupation
- our test scenario is as follow:
- during the 5 first hours, all peers join in the party (prods and
cons)
- after 5 hours, producers stop publishing messages
- the test goes on for a configurable duration to allow the consumer
to finish emptying the queue
during long running tests, we have encountered strange behaviour due
to flow control :
The queue depth starts to increase linearly for about 2 hours, these
is coherent since the message throughput of the single consumer
is not enough to absorb message ingress. Memory occupation grow faster
as well, until the memory watermark is reached on the broker side.
From that point, the producers are indeed paused, as flow control
request has been issued by the broker, but the consumer seems to be
blocked
as well. The queue level is flatten at its top value until the end of
the test, even when memory occupation lowered under the threshold.
By registering the FlowListener callback, we have noticed that not all
of the producers are notified all the time the alarm handler is set.
Does this mean that the broker applies some heuristic to try not to
block every body every time ?
Or does it mean that some of the channels have been somehow
blacklisted by the broker ?
Could anybody explain how the blocking of consumer is assumed to be
implemented ?
Does the call of Channel.publish() is someHow blocking the connection
Thread ?
How come that the consumer connection is also blocked ?
Does the implementation of FlowListener interface may help to handle
flow control request ?
(I thought at first glance that the flow control should be implemented
by hand using this interface,
but looking at this http://hopper.squarespace.com/blog/2008/11/9/flow-control-in-rabbitmq.html
after all, it seems that it is not the case anyway)
Best regards,
Romary.
_______________________________________________
rabbitmq-discuss mailing list
rabbitmq...@lists.rabbitmq.com
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
- The previous situation have been encountered with rabbitMQ 1.8.1
broker.
I've started playing a bit with the latest release 2.0.0 and I m
affraid that it looks like their are some regression or at least some
semantic updates.
I've just modified the provided MultiCast main sample of the java
client library to add a FlowListener interface on both Producer and
Consumer,
and registering the flow listener on their respective Channel.
It looks like any listener is called back when the alarm handler is
set or cleared, while the producers are still paused / resumed
like their are to be.
Does it mean some evolution in the flow control inner mechanism of the
message broker or a regression on the Java client stack ?
Furthermore, what about the deployment of the client stack on the
Maven repository ?
Le 8 sept. 10 à 11:33, romary...@gmail.com a écrit :
First, thank a lot for the feedback. For example the information about
SSL memory
usage is indeed very interesting. (if that is a big problem to you,
you may always
fall back to the old technique of using stunnel)
The flow control was heavily modified between 1.8.1 and 2.0.0. In summary:
- 1.8.1 - we have send Channel.flow AMQP message to everyone once
rabbit reached memory limit
- 2.0.0 - once we reach memory limit, the connections from which we hear
publishes are stopped temporarily. We stop receiving bytes from tcp sockets.
That 'stop' shouldn't take too long, as data should be swapped out to disk
and memory pressure will drop pretty quickly.
On Wed, Sep 8, 2010 at 11:49, Romary Kremer <romary...@gmail.com> wrote:
> I've started playing a bit with the latest release 2.0.0 and I m affraid
> that it looks like their are some regression or at least some semantic
> updates.
It's best if you upgraded both server and client library. Do you have any
particular problems? A lot was changed in 2.0.0 but we think it's fully
functional. If you found something that blocks you to migrate, you
could report a bug.
> It looks like any listener is called back when the alarm handler is set or
> cleared, while the producers are still paused / resumed
> like their are to be.
Interesting. Maybe we have a race there? Or maybe you're blocking
the main java client thread? (nothing blocking should be done from
the main thread)
>> during long running tests, we have encountered strange behaviour due to
>> flow control :
>>
>> The queue depth starts to increase linearly for about 2 hours, these is
>> coherent since the message throughput of the single consumer
>> is not enough to absorb message ingress. Memory occupation grow faster as
>> well, until the memory watermark is reached on the broker side.
Are you sure your consumer is ACK-ing the messages it received?
>> From that point, the producers are indeed paused, as flow control request
>> has been issued by the broker, but the consumer seems to be blocked
>> as well. The queue level is flatten at its top value until the end of the
>> test, even when memory occupation lowered under the threshold.
That's how 1.8.1 behaves. In 2.0.0 we introduced swapping out big queues
to disk, so the memory usage shouldn't be dependent on a queue size.
>> By registering the FlowListener callback, we have noticed that not all of
>> the producers are notified all the time the alarm handler is set.
>> Does this mean that the broker applies some heuristic to try not to block
>> every body every time ?
>> Or does it mean that some of the channels have been somehow blacklisted by
>> the broker ?
No, in 1.8.1 broker should send 'channel.flow' to all the channels.
>> Could anybody explain how the blocking of consumer is assumed to be
>> implemented ?
The best description is probably here:
http://www.rabbitmq.com/extensions.html#memsup
But it covers 2.0.0. I'd suggest an upgrade to 2.0.0 and monitoring
not only queue size but also number of unacknowledged messages
('Msg unack' in status plugin). This number should be near zero.
Cheers,
Marek Majkowski
Le 10 sept. 10 à 15:06, Marek Majkowski a écrit :
> Romary,
>
> First, thank a lot for the feedback. For example the information about
> SSL memory
> usage is indeed very interesting. (if that is a big problem to you,
> you may always
> fall back to the old technique of using stunnel)
Stunnel is not considered for now, We would rather ear your opinion
and knowledge
of potential memory leak due to the use SSL (maybe from the Erlang
stack itself).
It seems that memory is still growing even when the broker is running
no activity
but connections. We ran a test in which we just open 10 000
connections and do
noting for 2 hours but observing the memory occupation on the broker
side to confirm it.
> The flow control was heavily modified between 1.8.1 and 2.0.0. In
> summary:
> - 1.8.1 - we have send Channel.flow AMQP message to everyone once
> rabbit reached memory limit
> - 2.0.0 - once we reach memory limit, the connections from which we
> hear
> publishes are stopped temporarily. We stop receiving bytes from
> tcp sockets.
> That 'stop' shouldn't take too long, as data should be swapped out
> to disk
> and memory pressure will drop pretty quickly.
Do you mean that in 2.0.0 the Channel.flow AMQP message is no longer
sent to
the producer that are stopped temporarily ? So that would explain why
1) Channel.publish() can be blocking on the client side when the
broker stop
reading from the socket !
2) FlowListener.handleFlow() is no longer invoked on the registered
listener when
the alarm handler is set or cleared
Are my deduction wright ?
Do you have any figures to quantify "should, not take too long" ? Are
their some
test reports available about that major evolution ?
>
>
> On Wed, Sep 8, 2010 at 11:49, Romary Kremer
> <romary...@gmail.com> wrote:
>> I've started playing a bit with the latest release 2.0.0 and I m
>> affraid
>> that it looks like their are some regression or at least some
>> semantic
>> updates.
>
> It's best if you upgraded both server and client library. Do you
> have any
> particular problems? A lot was changed in 2.0.0 but we think it's
> fully
> functional. If you found something that blocks you to migrate, you
> could report a bug.
Sorry If I wasn't clear on the previous post ,we are already in 2.0.0
for both broker and
client library.
>
>> It looks like any listener is called back when the alarm handler
>> is set or
>> cleared, while the producers are still paused / resumed
>> like their are to be.
>
> Interesting. Maybe we have a race there? Or maybe you're blocking
> the main java client thread? (nothing blocking should be done from
> the main thread)
I am quite sure I am not blocking the main thread, neither the
Connection Thread. All
the message-related logic is in a particular thread (Some kind of
ProducerGroup
pool of threads actually).
Consumer call back are running within the Connection thread if I refer
to the Javadoc !
The same code using the library version 1.8.1, The callback where
invoked when
alarm handler is set or cleared anyway.
>
>>> during long running tests, we have encountered strange behaviour
>>> due to
>>> flow control :
>>>
>>> The queue depth starts to increase linearly for about 2 hours,
>>> these is
>>> coherent since the message throughput of the single consumer
>>> is not enough to absorb message ingress. Memory occupation grow
>>> faster as
>>> well, until the memory watermark is reached on the broker side.
>
> Are you sure your consumer is ACK-ing the messages it received?
The Consumer call back does ACK messages upon reception, one at a time
(multiple == false).
Does the basic.ack() method is eligible to be blocked as well as
publish() upon flow control ?
>
>>> From that point, the producers are indeed paused, as flow control
>>> request
>>> has been issued by the broker, but the consumer seems to be blocked
>>> as well. The queue level is flatten at its top value until the end
>>> of the
>>> test, even when memory occupation lowered under the threshold.
>
> That's how 1.8.1 behaves. In 2.0.0 we introduced swapping out big
> queues
> to disk, so the memory usage shouldn't be dependent on a queue size.
Good new, because we had identified 2 scenarios in wich memory-based
channel flow
was triggered :
- the use of SSL
- the use of larger message (4kb, same ingress)
Now I hope that the message size will not be that much determinant for
flow controll,as soon
as consumers are able to handle these message regularly.
>
>>> By registering the FlowListener callback, we have noticed that not
>>> all of
>>> the producers are notified all the time the alarm handler is set.
>>> Does this mean that the broker applies some heuristic to try not
>>> to block
>>> every body every time ?
>>> Or does it mean that some of the channels have been somehow
>>> blacklisted by
>>> the broker ?
>
> No, in 1.8.1 broker should send 'channel.flow' to all the channels.
Strange so, their must be some thing very weird.
>
>>> Could anybody explain how the blocking of consumer is assumed to be
>>> implemented ?
>
> The best description is probably here:
> http://www.rabbitmq.com/extensions.html#memsup
>
> But it covers 2.0.0. I'd suggest an upgrade to 2.0.0 and monitoring
> not only queue size but also number of unacknowledged messages
> ('Msg unack' in status plugin). This number should be near zero.
>
We are already with 2.0.0.
Where can I find some doc about the Status plugin anyway ?
Cheers, Romary.
Yes. You will never hear "FlowListener.handleFlow()" and it may be possible for
channel.publish to block (though I would need to consult the sources
to be sure).
> Do you have any figures to quantify "should, not take too long" ? Are their
> some
> test reports available about that major evolution ?
That's the question we really avoided :) Oh, well. No, we haven't done any
'real' tests, it's only based on our intuition and experience. In most
cases the blocking goes away pretty quickly - after 30 seconds usually,
about two minutes sometimes.
But it is possible to create a very pessimistic environment in which the
memory usage will not drop - and the connection could be stuck for a long time.
(though it's pretty unlikely).
> Sorry If I wasn't clear on the previous post ,we are already in 2.0.0 for
> both broker and
> client library.
Good.
Well, under current implementation of flow control - yes. As it's whole
tcp/ip connection that gets blocked. It will affect any commands, including
basic.ack.
What we usually propose is to use different tcp/ip connection for receiving
and different for publishing. On memory pressure we only block the publishers.
Using separate connection only for receiving you may be sure it will
never be blocked.
I'm afraid the old blog post is the only source:
http://www.lshift.net/blog/2009/11/30/introducing-rabbitmq-status-plugin
On Tue, Sep 14, 2010 at 09:07, romary...@gmail.com
<romary...@gmail.com> wrote:The flow control was heavily modified between 1.8.1 and 2.0.0. In summary:- 1.8.1 - we have send Channel.flow AMQP message to everyone oncerabbit reached memory limit- 2.0.0 - once we reach memory limit, the connections from which we hearpublishes are stopped temporarily. We stop receiving bytes from tcpsockets.That 'stop' shouldn't take too long, as data should be swapped out todiskand memory pressure will drop pretty quickly.Do you mean that in 2.0.0 the Channel.flow AMQP message is no longer sent tothe producer that are stopped temporarily ? So that would explain why1) Channel.publish() can be blocking on the client side when thebroker stopreading from the socket !2) FlowListener.handleFlow() is no longer invoked on the registeredlistener whenthe alarm handler is set or clearedAre my deduction wright ?
Yes. You will never hear "FlowListener.handleFlow()" and it may be possible for
channel.publish to block (though I would need to consult the sources
to be sure).
Do you have any figures to quantify "should, not take too long" ? Are theirsometest reports available about that major evolution ?
That's the question we really avoided :) Oh, well. No, we haven't done any
'real' tests, it's only based on our intuition and experience. In most
cases the blocking goes away pretty quickly - after 30 seconds usually,
about two minutes sometimes.
But it is possible to create a very pessimistic environment in which the
memory usage will not drop - and the connection could be stuck for a long time.
(though it's pretty unlikely).
Hurray!
> To step in the memory limitations faster, I 've decreased the memory
> watermark threshold to .05 (5%)
> This corresponds to 50 MB on the host we are using for this test as shown on
> the rabbit.log file upon startup of broker :
> =INFO REPORT==== 15-Sep-2010::12:36:15 ===
> Memory limit set to 50MB.
Hooa. 50 megs is _really_ small. Probably more than half of it is
taken by Erlang
internals, really not much is left for Rabbit!
It's hard to say what is the minimal sensible memory allowance, but I'd guess
not less than 150 megs. Probably even 250 megs if you do anything non-trivial.
In your 50megs setup you may run into situation when once the memory
alarm goes on it _never_ goes off. That would mean you won't be able
to publish any message.
> Finally the broker status description (rabbitmqctl status)
> [{running_applications,[{rabbit_status,"RabbitMQ Status Page","0.01"},
> {rabbit,"RabbitMQ","2.0.0"},
> {ssl,"Erlang/OTP SSL application","3.10.7"},
> {public_key,"Public key infrastructure","0.4"},
> {mnesia,"MNESIA CXC 138 12","4.4.12"},
> {os_mon,"CPO CXC 138 46","2.2.4"},
> {sasl,"SASL CXC 138 11","2.1.8"},
> {rabbit_mochiweb,"RabbitMQ Mochiweb
> Embedding","0.01"},
> {mochiweb,"MochiMedia Web Server","1.3"},
> {crypto,"CRYPTO version 1","1.6.3"},
> {inets,"INETS CXC 138 49","5.2"},
> {stdlib,"ERTS CXC 138 10","1.16.4"},
> {kernel,"ERTS CXC 138 10","2.13.4"}]},
> {nodes,[{disc,['rabbit@mystic-buntu']}]},
> {running_nodes,['rabbit@mystic-buntu']}]
Ok. For testing and benchmarking I'd recommend to remove status plugin.
Status plugin (if you look at the results) constantly queries Queues, which
may in some cases make it less likely to GC/hibernate them.
The new management plugin is better and shouldn't affect the GC that much.
(though for benchmarking it's still better to disalbe all the pluigins)
> - The last lines in the broker log are :
> =INFO REPORT==== 15-Sep-2010::14:53:44 === vm_memory_high_watermark set.
> Memory used:53650536 allowed:52556595 =INFO REPORT==== 15-Sep-2010::14:53:44
> === alarm_handler: {set,{vm_memory_high_watermark,[]}}
> From that point, the producer is blocked without any possible recovery. As
> the flow control is designed in v 2.0.0, I would have expected the producer
> to be released thanks to sawp of messages to the disk.
Yes it will! But only if it has enough memory to breathe!
What has happened is that internal Erlang stuff took most of the memory,
and even if we write everything to disk we won't be able to keep the
memory usage
under the threshold.
> I think this behaviour is not expected and maybe that could be due to a bug
> somewhere, since it si fully reproductible on my configuration.
> The fact that the memory occupation never falls below the threshold after
> message are removed from the queue is particularly strange and
> unwilling from my point of view. It think that this simple test can points
> out an issue that would explains the problems that I mentioned in the
> previous messages, but not sure about it.
> I am sorry for this quite long message, but I thought that the more details
> you get, the better.
Few suggestions:
- Give rabbit more memory. How much memory? That depends on your setup.
If you run 10k of connections you need enough memory to handle those
connections + some memory to keep messages and queues.
If the connections will eat up all the memory - rabbit won't allow
you to publish.
- Disable status plugin (you can try to use management plugin).
- Try new Erlang, R14B. They claim to have fixed some memory leaks with SSL,
so it may help.
Cheers!
Marek
It's more complex than that, but yes, you won't hear anything on that interface.
> Does not really matter for us anyway, cause we where on wrong idea using
> that.
> Does this new implementation keep the broker on track for compliance with
> specification then ?
Yes, the spec doesn't force us to send channel.flow. We implemented that
for a while but realized that it doesn't solve our problems.
> This would be acceptable for our needs, only if we can somehow guarantee
> that's an upper boundary !
Optimistically, the upper bouadary is not more than your memory usage
divided by the disk throughput.
> But it is possible to create a very pessimistic environment in which the
> memory usage will not drop - and the connection could be stuck for a long
> time.
> (though it's pretty unlikely).
> ... Not that much unlikely, considering my little playing with the
> MultiCastMain sample (see my previous reply about it for details).
> I get 100 % times blocked connection.
> What would be, based on your knowledge and your intuition, "a very
> pessimistic environment in which the memory usage will not drop" ?
> I think that the experimentation I've done on the MultiCastMain is maybe a
> beginning of an answer for that question, although I would
> never have considered that a single producer could have such power to flood
> the broker.
Okay. The memory can stay high due to a lot of reasons. If your metadata
that rabbit never releases is using more memory than threshold -
rabbit will just get stuck.
Next thing. remember, we don't control how erlang eats memory - and it
has pretty
complex GC and memory allocation mechanisms.
If you think that you have enough memory for all the connections,
queues, exchanges and bindings. And some memory for the messages.
And you still hit the limit when you get stuck - feel free to tune
Erlang GC internals:
http://www.erlang.org/doc/man/erts_alloc.html
> Weren't Channel design for that ? In our environment, we have (naively ?)
> considered the use of Channel to
> separate the message production from the consumption.
> Since we are targeting 10 000 peers doing both production and consumption,
> the fact of multiplying the number of
> connections by 2 is not negligible at all, considering scalability.
> Moreover, as I reported later on, we use SSL to authenticate the broker, and
> we are still unclear about memory leaks
> induce by SSL connections. Doubling the number of connections will not be
> negligible at all considering memory occupation either.
> In conclusion, we are not likely to implement our peers using 2 connections
> for the same broker.
> What would you recommend to us then ? And could you give us a better
> understanding on the use case of channels ?
Yes, channels were designed exactly for that. On the other hand, AMQP
has few pretty serious issues. For example when you open channel
you're free to publish a message. And broker can't 'refuse' accepting
a message. Channel.flow can be sent from the broker to the client
but *after* channel is opened. So there is a window in which
you just can publish (after channel.open before channel.flow). Sorry, there
just isn't other way of forbidding publishes than by stopping the whole
connection. I also don't like it, but it's the only way.
Cheers,
Marek
As Marek told, in version previous to 2.0.0, the broker sends
Channel.flow AMQP message to everyone once
rabbit reached memory limit. However, I still dont' understand how it
came to block the production with that version.
In first, I thought that the client application should implement its
own pause/resume logic, and that FlowListener interface would be the
way to do that. I realized only later that the producer was paused as
well without the use of FlowListener, for instance using the
MultiCastMain sample in Java client 1.7.2.
Can you give us hints about how the production was indeed paused,
other by stopping reading from the socket ?
Does the broker shut the connection or only the channel if a content-
bearing message is sent after the flow.control ?
What is the meaning of status "Blocked" and "Blocking" that we can
gather by listing connections ! I often noticed that
the connection of producer in status "blocked" lead the connection of
the consumer to be "Blocking".
Sharing the same connection for producer and consumer, by the use of
Channel may not be safe, that is very sad !
Best regards,
Romary.
Le 16 sept. 10 à 14:49, Marek Majkowski a écrit :
The client itself understands the channel.flow messages that come from
the broker. Thus the client then itself blocks any publishes that the
application does, waiting for permission from the broker to resume
publishing.
> Does the broker shut the connection or only the channel if a
> content-bearing message is sent after the flow.control ?
I can't remember. I would think it should be channel only on the grounds
that other channels that may be consuming messages have done nothing
wrong. However, for that to happen is basically a client error: the
client itself should understand the channel flow messages and make
sending content-bearing messages a blocking operation at that point.
> What is the meaning of status "Blocked" and "Blocking" that we can
> gather by listing connections ! I often noticed that
> the connection of producer in status "blocked" lead the connection
> of the consumer to be "Blocking".
That will mean that channel.flow{active=false} has been sent to the
client, and then that channel.flow_ok has been received back by the
server.
> Sharing the same connection for producer and consumer, by the use of
> Channel may not be safe, that is very sad !
Well, it's fine provided you have a client that works properly and you
understand that at times, publishing a content bearing message may be a
blocking operation, and code accordingly.
Matthew
We use channel.basicPublish (java client) from web request handler and
need to reply quickly. Hanging basicPublish will ruin our response
time. It would be better to specify timeout on publishing or to throw
an exception in case of server reaching memory limit.
Regards,
Anton Lebedevich.
We do need to check stopped state of connection before sending messages.
Please leave some hints to client to decide if it can send messages
without hanging.
Something like 'boolean isPublishingAllowed()' in Channel or timeout
parameter in basicPublish would be fine.
Regards,
Anton Lebedevich.