Publish messages within a subscribe block

175 views
Skip to first unread message

PJ

unread,
Nov 13, 2012, 7:41:40 PM11/13/12
to ruby...@googlegroups.com
Hello,

Here is what I am doing:

foo_queue.subscribe do |payload|

  # do stuff A
  
  ex.publish(stuff, :routing_key => "fooA")

  # do stuff B

end

From my tests, it seems that the messages on fooA are only sent once the entire block has executed, i.e. after "do stuff B". It is an issue for me since "do stuff B" is time consuming and I need the messages as soon as possible.

Is there a way to do this? Or am I missing something?

Thanks!
PJ

Michael Klishin

unread,
Nov 13, 2012, 7:56:08 PM11/13/12
to ruby...@googlegroups.com


2012/11/14 PJ <wamr...@googlemail.com>

From my tests, it seems that the messages on fooA are only sent once the entire block has executed, i.e. after "do stuff B". It is an issue for me since "do stuff B" is time consuming and I need the messages as soon as possible.

Is there a way to do this? Or am I missing something?

this means that

# do stuff A

or

# do stuff B

blocks and subscribe blocks are executed on the EventMachine event loop thread. Use EventMachine.defer
to execute them in a thread pool or make sure they do not block. This is a known limitation of all event loops.

--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin

PJ

unread,
Nov 13, 2012, 8:16:45 PM11/13/12
to ruby...@googlegroups.com
Thanks a lot Michael. Defer is really helpful!

PJ

unread,
Nov 22, 2012, 6:30:31 AM11/22/12
to Ruby AMQP ecosystem
Everything has been working fine with defer, until I started to load
test my system. Just to give a bit more details, here is what I am
doing (simplified, only showing messages related things):

EventMachine.run do
foo_queue.subscribe do |payload|
... do stuff
ex.publish(stuff, :routing_key => "fooA")

operation = proc {
... do stuff including access to the DB. Returns result ##
Section A
ex.publish(stuff, :routing_key => "fooA")
result
}

callback = proc {|result|
if result
ex.publish(stuff, :routing_key => "fooA") ## Section B
end
}

EventMachine.defer(operation, callback)

end
end

It is difficult to understand exactly what is failing, but I am
getting errors such as:

In Section A: <internal:prelude>:8:in `lock': deadlock detected
(fatal)

In Section B: /usr/lib64/ruby/gems/1.9.1/gems/amq-client-0.9.8/lib/amq/
client/async/adapter.rb:247:in `send_frame': Trying to send frame
through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:
0x007f3b049fbd98 @payload="\x00<
\x00(\x00\x00\x00\x1E941359_test_machine\x00", @channel=1>
(AMQ::Client::ConnectionClosedError)


It "looks like" (I don't know to be honest) a thread safety issue, but
I am not sure how to address this.


Thanks a lot
PJ


On Nov 14, 1:16 am, PJ <wamre...@googlemail.com> wrote:
> Thanks a lot Michael. Defer is really helpful!
>
>
>
>
>
>
>
> On Wednesday, November 14, 2012 12:56:49 AM UTC, Michael Klishin wrote:
>
> > 2012/11/14 PJ <wamr...@googlemail.com <javascript:>>

Laust Rud Jacobsen

unread,
Nov 22, 2012, 6:36:35 AM11/22/12
to ruby...@googlegroups.com
Quick note: once you use .defer, that code running in the defer-block
is running on a separate thread. To execute code back on the main
reactor thread, put that code in a EM.next_tick block. Maybe that's
what you're missing, only accessing the AMQP objects on the main
reactor thread.

--
Laust Rud Jacobsen
la...@object.io | blog: object.io | skype:object.io | +45 2588 4665
> --
> Documentation guides: http://bit.ly/rubyamqp
> Code examples: http://bit.ly/amq-gem-examples
> API reference: http://bit.ly/mDm1JE
>
> Drop by #rabbitmq on irc.freenode.net
> Bug tracker: https://github.com/ruby-amqp/amqp/issues
>
> Post to the group: ruby...@googlegroups.com | unsubscribe: ruby-amqp+...@googlegroups.com

Michael Klishin

unread,
Nov 22, 2012, 8:17:03 AM11/22/12
to ruby...@googlegroups.com


2012/11/22 PJ <wamr...@googlemail.com>

It "looks like" (I don't know to be honest) a thread safety issue, but
I am not sure how to address this.

Not necessarily. It may be an uncaught exception (when connection went down, AMQP::Exchange#publish raised an exception). Channel synchronize publishing but it is still
highly recommended not sharing them between threads. See RabbitMQ log, there has to be
something about why the connection was closed.

PJ

unread,
Nov 22, 2012, 9:34:11 AM11/22/12
to Ruby AMQP ecosystem
Thanks guys.

Laust: thing is my code takes time to execute, so I'd rather have it
executed in a background thread using defer rather than in the main
loop.

Michael: you are right. It looks like messages got interleaved. The
log shows:

connection <0.3418.0> (running), channel 1 - error:
{amqp_error,unexpected_frame,
"expected content body, got non content body frame
instead",
'basic.publish'}

I understand what you are saying regarding not sharing channels
between threads, but then I am not sure how to do that if you want to
use defer (considering the block might be executed in any of the
pool's threads). Does it mean one should not publish anything within a
deferred block, and instead find a way to "postpone" it to the
callback?

On Nov 22, 1:17 pm, Michael Klishin <michael.s.klis...@gmail.com>
wrote:
> 2012/11/22 PJ <wamre...@googlemail.com>

Michael Klishin

unread,
Nov 22, 2012, 9:48:17 AM11/22/12
to ruby...@googlegroups.com


2012/11/22 PJ <wamr...@googlemail.com>

connection <0.3418.0> (running), channel 1 - error:
{amqp_error,unexpected_frame,
            "expected content body, got non content body frame
instead",
            'basic.publish'}

I understand what you are saying regarding not sharing channels
between threads, but then I am not sure how to do that if you want to
use defer (considering the block might be executed in any of the
pool's threads).

What version do you use? We added a mutex that synchronizes publishing
months ago. Do you have wireshark installed by any chance? Could you
record a session for me?
 
Does it mean one should not publish anything within a
deferred block, and instead find a way to "postpone" it to the
callback?

No, that would make things needlessly difficult. I would just open two channels,
one only used for publishing, so that the publishing channel is in the scope of what you
are deferring. Simply using separate channels is all you need.

I am very curious under what circumstances with recent versions you are getting
frames interleaved on the same channel.

François Beausoleil

unread,
Nov 22, 2012, 2:42:21 PM11/22/12
to ruby...@googlegroups.com

Le 2012-11-22 à 09:34, PJ a écrit :

> Laust: thing is my code takes time to execute, so I'd rather have it
> executed in a background thread using defer rather than in the main
> loop.

What Laust meant is this:

EM.defer do
# do stuff
EM.next_tick do
exchange.publish(...)
end
end

That way, the publication always happens in the same thread, namely the EventMachine one.

Bye!
François

PJ

unread,
Nov 28, 2012, 6:49:09 AM11/28/12
to ruby...@googlegroups.com
Guys, I meant to apologize it took me a lot of time to get back to you. But this is proving very challenging for me. And again, just to be clear: everything is absolutely fine under "normal" use. It only starts to fail under heavy load.

Thank you François. Indeed, I misunderstood what Laust had said. I am trying this solution but it still fails from time to time.

Michael, sorry it took me a lot of time to get back to you: it is hard to reproduce this specific error. It happens very randomly. I am using:
- services side:
amq-client (0.9.9)
amq-protocol (0.9.5)
amqp (0.9.8)
- client side (Rails 3):
bunny (0.8.0)

(unfortunately I can't install wireshark on these boxes at present)


I have now refactored my code to try and combine Laust's and your suggestions. I am still getting random problems under load though. So just two questions:

- is it "bad" to nest deferred blocks (I am now making sure all "publish" actions only happen in the callbacks)?

EventMachine.run do
  
  operation = proc {
    foo1()
  }
  callback = proc {|result|
    foo2(result)

    operation2 = proc {
      foo3(result)
    }
    callback2 = proc {|result2|
      foo4(result2)
    }
    EventMachine.defer(operation2, callback2)

  }
  EventMachine.defer(operation, callback)
  
end

- Michael, when you say "I would just open two channels, one only used for publishing", I am not sure I follow. Are you saying I should do something like:

  EventMachine.run do
    connection = AMQP.connect(config[:amqp_server])
    read_channel  = AMQP::Channel.new(connection)
    read_ex = read_channel.default_exchange
    write_channel  = AMQP::Channel.new(connection)
    write_ex = write_channel.default_exchange
    
    ...
  end
  
and make sure all the "subscribe" are bound to the read_channel, and all the "publish" to the write_channel? Is it still recommended if I make sure all the "publish" actions happen in the reactor loop?


Thanks
PJ

Michael Klishin

unread,
Nov 28, 2012, 7:39:10 AM11/28/12
to ruby...@googlegroups.com
2012/11/28 PJ <wamr...@googlemail.com>

- Michael, when you say "I would just open two channels, one only used for publishing", I am not sure I follow. Are you saying I should do something like:

  EventMachine.run do
    connection = AMQP.connect(config[:amqp_server])
    read_channel  = AMQP::Channel.new(connection)
    read_ex = read_channel.default_exchange
    write_channel  = AMQP::Channel.new(connection)
    write_ex = write_channel.default_exchange
    
    ...
  end

Yes.

I am curious if using next Bunny release may be easier for you. It does not have this problem because consumer handlers
are executed in a separate thread from network activity. See

PJ

unread,
Nov 28, 2012, 9:08:13 AM11/28/12
to ruby...@googlegroups.com
Michael,

I will try this. I don't know if it makes any sense, but indeed, it seems the problems stem from the client side (which uses Bunny), not the services (which use AMQP). Indeed, I am doing two types of load tests:

a- from within a Rails console (i.e. pure AMQP messaging, no http requests).
b- using Celerity (i.e. "real" browsers and requests)

With a-, I can load the services like crazy and don't get a single issue.
With b-, some of my services start to fail randomly.

On this, is there a recommended way to use Bunny from Rails / Passenger? Right now, I am doing it very bluntly: every time the client app needs to send/receive a message, I start a connection, then close it. It means a lot of connections/disconnections.


PJ

Michael Klishin

unread,
Nov 28, 2012, 9:13:44 AM11/28/12
to ruby...@googlegroups.com


2012/11/28 PJ <wamr...@googlemail.com>

On this, is there a recommended way to use Bunny from Rails / Passenger? Right now, I am doing it very bluntly: every time the client app needs to send/receive a message, I start a connection, then close it. It means a lot of connections/disconnections.

Connect in an initializer. Bunny 0.9 does not try to hide channels from you so you will have to also open
a channel. The same goes for any other application, more or less.

Bunny 0.9 does not currently have network partition handling, but that is on top of the list.

PJ

unread,
Nov 28, 2012, 11:53:29 AM11/28/12
to ruby...@googlegroups.com
Michael, thank you SO much.

I replaced Bunny 0.8.0 with 0.9.0.pre1 and I don't see any errors anymore under load. Everything seems fine. Even all the "begin / rescue / retry" blocks I had in my code don't seem necessary anymore.

I will report back if further testing yields errors, but so far it is all good.

And the test suite is super helpful to understand the new API.

Thanks a million
PJ
Reply all
Reply to author
Forward
0 new messages