From my tests, it seems that the messages on fooA are only sent once the entire block has executed, i.e. after "do stuff B". It is an issue for me since "do stuff B" is time consuming and I need the messages as soon as possible.
Is there a way to do this? Or am I missing something?
> From my tests, it seems that the messages on fooA are only sent once the
> entire block has executed, i.e. after "do stuff B". It is an issue for me
> since "do stuff B" is time consuming and I need the messages as soon as
> possible.
> Is there a way to do this? Or am I missing something?
this means that
# do stuff A
or
# do stuff B
blocks and subscribe blocks are executed on the EventMachine event loop
thread. Use EventMachine.defer
to execute them in a thread pool or make sure they do not block. This is a
known limitation of all event loops.
>> From my tests, it seems that the messages on fooA are only sent once the >> entire block has executed, i.e. after "do stuff B". It is an issue for me >> since "do stuff B" is time consuming and I need the messages as soon as >> possible.
>> Is there a way to do this? Or am I missing something?
> this means that
> # do stuff A
> or
> # do stuff B
> blocks and subscribe blocks are executed on the EventMachine event loop > thread. Use EventMachine.defer > to execute them in a thread pool or make sure they do not block. This is a > known limitation of all event loops.
Everything has been working fine with defer, until I started to load
test my system. Just to give a bit more details, here is what I am
doing (simplified, only showing messages related things):
EventMachine.run do
foo_queue.subscribe do |payload|
... do stuff
ex.publish(stuff, :routing_key => "fooA")
operation = proc {
... do stuff including access to the DB. Returns result ##
Section A
ex.publish(stuff, :routing_key => "fooA")
result
}
callback = proc {|result|
if result
ex.publish(stuff, :routing_key => "fooA") ## Section B
end
}
EventMachine.defer(operation, callback)
end
end
It is difficult to understand exactly what is failing, but I am
getting errors such as:
In Section A: <internal:prelude>:8:in `lock': deadlock detected
(fatal)
In Section B: /usr/lib64/ruby/gems/1.9.1/gems/amq-client-0.9.8/lib/amq/
client/async/adapter.rb:247:in `send_frame': Trying to send frame
through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:
0x007f3b049fbd98 @payload="\x00<
\x00(\x00\x00\x00\x1E941359_test_machine\x00", @channel=1>
(AMQ::Client::ConnectionClosedError)
It "looks like" (I don't know to be honest) a thread safety issue, but
I am not sure how to address this.
Thanks a lot
PJ
On Nov 14, 1:16 am, PJ <wamre...@googlemail.com> wrote:
> >> From my tests, it seems that the messages on fooA are only sent once the
> >> entire block has executed, i.e. after "do stuff B". It is an issue for me
> >> since "do stuff B" is time consuming and I need the messages as soon as
> >> possible.
> >> Is there a way to do this? Or am I missing something?
> > this means that
> > # do stuff A
> > or
> > # do stuff B
> > blocks and subscribe blocks are executed on the EventMachine event loop
> > thread. Use EventMachine.defer
> > to execute them in a thread pool or make sure they do not block. This is a
> > known limitation of all event loops.
Quick note: once you use .defer, that code running in the defer-block
is running on a separate thread. To execute code back on the main
reactor thread, put that code in a EM.next_tick block. Maybe that's
what you're missing, only accessing the AMQP objects on the main
reactor thread.
> Everything has been working fine with defer, until I started to load
> test my system. Just to give a bit more details, here is what I am
> doing (simplified, only showing messages related things):
> EventMachine.run do
> foo_queue.subscribe do |payload|
> ... do stuff
> ex.publish(stuff, :routing_key => "fooA")
> operation = proc {
> ... do stuff including access to the DB. Returns result ##
> Section A
> ex.publish(stuff, :routing_key => "fooA")
> result
> }
> callback = proc {|result|
> if result
> ex.publish(stuff, :routing_key => "fooA") ## Section B
> end
> }
> EventMachine.defer(operation, callback)
> end
> end
> It is difficult to understand exactly what is failing, but I am
> getting errors such as:
> In Section A: <internal:prelude>:8:in `lock': deadlock detected
> (fatal)
> In Section B: /usr/lib64/ruby/gems/1.9.1/gems/amq-client-0.9.8/lib/amq/
> client/async/adapter.rb:247:in `send_frame': Trying to send frame
> through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:
> 0x007f3b049fbd98 @payload="\x00<
> \x00(\x00\x00\x00\x1E941359_test_machine\x00", @channel=1>
> (AMQ::Client::ConnectionClosedError)
> It "looks like" (I don't know to be honest) a thread safety issue, but
> I am not sure how to address this.
> Thanks a lot
> PJ
> On Nov 14, 1:16 am, PJ <wamre...@googlemail.com> wrote:
>> Thanks a lot Michael. Defer is really helpful!
>> On Wednesday, November 14, 2012 12:56:49 AM UTC, Michael Klishin wrote:
>> >> From my tests, it seems that the messages on fooA are only sent once the
>> >> entire block has executed, i.e. after "do stuff B". It is an issue for me
>> >> since "do stuff B" is time consuming and I need the messages as soon as
>> >> possible.
>> >> Is there a way to do this? Or am I missing something?
>> > this means that
>> > # do stuff A
>> > or
>> > # do stuff B
>> > blocks and subscribe blocks are executed on the EventMachine event loop
>> > thread. Use EventMachine.defer
>> > to execute them in a thread pool or make sure they do not block. This is a
>> > known limitation of all event loops.
> It "looks like" (I don't know to be honest) a thread safety issue, but
> I am not sure how to address this.
Not necessarily. It may be an uncaught exception (when connection went
down, AMQP::Exchange#publish raised an exception). Channel synchronize
publishing but it is still
highly recommended not sharing them between threads. See RabbitMQ log,
there has to be
something about why the connection was closed.
Laust: thing is my code takes time to execute, so I'd rather have it
executed in a background thread using defer rather than in the main
loop.
Michael: you are right. It looks like messages got interleaved. The
log shows:
connection <0.3418.0> (running), channel 1 - error:
{amqp_error,unexpected_frame,
"expected content body, got non content body frame
instead",
'basic.publish'}
I understand what you are saying regarding not sharing channels
between threads, but then I am not sure how to do that if you want to
use defer (considering the block might be executed in any of the
pool's threads). Does it mean one should not publish anything within a
deferred block, and instead find a way to "postpone" it to the
callback?
On Nov 22, 1:17 pm, Michael Klishin <michael.s.klis...@gmail.com>
wrote:
> > It "looks like" (I don't know to be honest) a thread safety issue, but
> > I am not sure how to address this.
> Not necessarily. It may be an uncaught exception (when connection went
> down, AMQP::Exchange#publish raised an exception). Channel synchronize
> publishing but it is still
> highly recommended not sharing them between threads. See RabbitMQ log,
> there has to be
> something about why the connection was closed.
> I understand what you are saying regarding not sharing channels
> between threads, but then I am not sure how to do that if you want to
> use defer (considering the block might be executed in any of the
> pool's threads).
What version do you use? We added a mutex that synchronizes publishing
months ago. Do you have wireshark installed by any chance? Could you
record a session for me?
> Does it mean one should not publish anything within a
> deferred block, and instead find a way to "postpone" it to the
> callback?
No, that would make things needlessly difficult. I would just open two
channels,
one only used for publishing, so that the publishing channel is in the
scope of what you
are deferring. Simply using separate channels is all you need.
I am very curious under what circumstances with recent versions you are
getting
frames interleaved on the same channel.
-- MK
Guys, I meant to apologize it took me a lot of time to get back to you. But this is proving very challenging for me. And again, just to be clear: everything is absolutely fine under "normal" use. It only starts to fail under heavy load.
Thank you François. Indeed, I misunderstood what Laust had said. I am trying this solution but it still fails from time to time.
Michael, sorry it took me a lot of time to get back to you: it is hard to reproduce this specific error. It happens very randomly. I am using:
- services side:
amq-client (0.9.9)
amq-protocol (0.9.5)
amqp (0.9.8)
- client side (Rails 3):
bunny (0.8.0)
(unfortunately I can't install wireshark on these boxes at present)
I have now refactored my code to try and combine Laust's and your suggestions. I am still getting random problems under load though. So just two questions:
- is it "bad" to nest deferred blocks (I am now making sure all "publish" actions only happen in the callbacks)?
- Michael, when you say "I would just open two channels, one only used for publishing", I am not sure I follow. Are you saying I should do something like:
and make sure all the "subscribe" are bound to the read_channel, and all the "publish" to the write_channel? Is it still recommended if I make sure all the "publish" actions happen in the reactor loop?
On Thursday, November 22, 2012 7:46:24 PM UTC, François Beausoleil wrote:
> Le 2012-11-22 à 09:34, PJ a écrit :
> > Laust: thing is my code takes time to execute, so I'd rather have it > > executed in a background thread using defer rather than in the main > > loop.
> What Laust meant is this:
> EM.defer do > # do stuff > EM.next_tick do > exchange.publish(...) > end > end
> That way, the publication always happens in the same thread, namely the > EventMachine one.
> - Michael, when you say "I would just open two channels, one only used for
> publishing", I am not sure I follow. Are you saying I should do something
> like:
I am curious if using next Bunny release may be easier for you. It does not
have this problem because consumer handlers
are executed in a separate thread from network activity. See
I will try this. I don't know if it makes any sense, but indeed, it seems the problems stem from the client side (which uses Bunny), not the services (which use AMQP). Indeed, I am doing two types of load tests:
a- from within a Rails console (i.e. pure AMQP messaging, no http requests). b- using Celerity (i.e. "real" browsers and requests)
With a-, I can load the services like crazy and don't get a single issue. With b-, some of my services start to fail randomly.
On this, is there a recommended way to use Bunny from Rails / Passenger? Right now, I am doing it very bluntly: every time the client app needs to send/receive a message, I start a connection, then close it. It means a lot of connections/disconnections.
>> - Michael, when you say "I would just open two channels, one only used >> for publishing", I am not sure I follow. Are you saying I should do >> something like:
> I am curious if using next Bunny release may be easier for you. It does > not have this problem because consumer handlers > are executed in a separate thread from network activity. See
> On this, is there a recommended way to use Bunny from Rails / Passenger?
> Right now, I am doing it very bluntly: every time the client app needs to
> send/receive a message, I start a connection, then close it. It means a lot
> of connections/disconnections.
Connect in an initializer. Bunny 0.9 does not try to hide channels from you
so you will have to also open
a channel. The same goes for any other application, more or less.
Bunny 0.9 does not currently have network partition handling, but that is
on top of the list.
-- MK
I replaced Bunny 0.8.0 with 0.9.0.pre1 and I don't see any errors anymore under load. Everything seems fine. Even all the "begin / rescue / retry" blocks I had in my code don't seem necessary anymore.
I will report back if further testing yields errors, but so far it is all good.
And the test suite is super helpful to understand the new API.
>> On this, is there a recommended way to use Bunny from Rails / Passenger? >> Right now, I am doing it very bluntly: every time the client app needs to >> send/receive a message, I start a connection, then close it. It means a lot >> of connections/disconnections.
> Connect in an initializer. Bunny 0.9 does not try to hide channels from > you so you will have to also open > a channel. The same goes for any other application, more or less.
> Bunny 0.9 does not currently have network partition handling, but that is > on top of the list. > -- > MK