Account Options

  1. Sign in
The old Google Groups will be going away soon, but your browser is incompatible with the new version.
Google Groups Home
« Groups Home
Publish messages within a subscribe block
There are currently too many topics in this group that display first. To make this topic appear first, remove this option from another topic.
There was an error processing your request. Please try again.
flag
  14 messages - Collapse all  -  Translate all to Translated (View all originals)
The group you are posting to is a Usenet group. Messages posted to this group will make your email address visible to anyone on the Internet.
Your reply message has not been sent.
Your post was successful
 
From:
To:
Cc:
Followup To:
Add Cc | Add Followup-to | Edit Subject
Subject:
Validation:
For verification purposes please type the characters you see in the picture below or the numbers you hear by clicking the accessibility icon. Listen and type the numbers you hear
 
PJ  
View profile  
 More options Nov 13 2012, 7:41 pm
From: PJ <wamre...@googlemail.com>
Date: Tue, 13 Nov 2012 16:41:40 -0800 (PST)
Local: Tues, Nov 13 2012 7:41 pm
Subject: Publish messages within a subscribe block

Hello,

Here is what I am doing:

foo_queue.subscribe do |payload|

  # do stuff A

  ex.publish(stuff, :routing_key => "fooA")

  # do stuff B

end

From my tests, it seems that the messages on fooA are only sent once the
entire block has executed, i.e. after "do stuff B". It is an issue for me
since "do stuff B" is time consuming and I need the messages as soon as
possible.

Is there a way to do this? Or am I missing something?

Thanks!
PJ


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Michael Klishin  
View profile  
 More options Nov 13 2012, 7:56 pm
From: Michael Klishin <michael.s.klis...@gmail.com>
Date: Wed, 14 Nov 2012 04:56:08 +0400
Local: Tues, Nov 13 2012 7:56 pm
Subject: Re: [ruby-amqp] Publish messages within a subscribe block

2012/11/14 PJ <wamre...@googlemail.com>

> From my tests, it seems that the messages on fooA are only sent once the
> entire block has executed, i.e. after "do stuff B". It is an issue for me
> since "do stuff B" is time consuming and I need the messages as soon as
> possible.

> Is there a way to do this? Or am I missing something?

this means that

# do stuff A

or

# do stuff B

blocks and subscribe blocks are executed on the EventMachine event loop
thread. Use EventMachine.defer
to execute them in a thread pool or make sure they do not block. This is a
known limitation of all event loops.

--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
PJ  
View profile  
 More options Nov 13 2012, 8:16 pm
From: PJ <wamre...@googlemail.com>
Date: Tue, 13 Nov 2012 17:16:45 -0800 (PST)
Local: Tues, Nov 13 2012 8:16 pm
Subject: Re: [ruby-amqp] Publish messages within a subscribe block

Thanks a lot Michael. Defer is really helpful!


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
PJ  
View profile  
 More options Nov 22 2012, 6:30 am
From: PJ <wamre...@googlemail.com>
Date: Thu, 22 Nov 2012 03:30:31 -0800 (PST)
Local: Thurs, Nov 22 2012 6:30 am
Subject: Re: Publish messages within a subscribe block
Everything has been working fine with defer, until I started to load
test my system. Just to give a bit more details, here is what I am
doing (simplified, only showing messages related things):

EventMachine.run do
  foo_queue.subscribe do |payload|
    ... do stuff
    ex.publish(stuff, :routing_key => "fooA")

    operation = proc {
      ... do stuff including access to the DB. Returns result  ##
Section A
      ex.publish(stuff, :routing_key => "fooA")
      result
    }

    callback = proc {|result|
      if result
        ex.publish(stuff, :routing_key => "fooA") ## Section B
      end
    }

    EventMachine.defer(operation, callback)

  end
end

It is difficult to understand exactly what is failing, but I am
getting errors such as:

In Section A: <internal:prelude>:8:in `lock': deadlock detected
(fatal)

In Section B: /usr/lib64/ruby/gems/1.9.1/gems/amq-client-0.9.8/lib/amq/
client/async/adapter.rb:247:in `send_frame': Trying to send frame
through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:
0x007f3b049fbd98 @payload="\x00<
\x00(\x00\x00\x00\x1E941359_test_machine\x00", @channel=1>
(AMQ::Client::ConnectionClosedError)

It "looks like" (I don't know to be honest) a thread safety issue, but
I am not sure how to address this.

Thanks a lot
PJ

On Nov 14, 1:16 am, PJ <wamre...@googlemail.com> wrote:


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Laust Rud Jacobsen  
View profile  
 More options Nov 22 2012, 6:36 am
From: Laust Rud Jacobsen <laust....@gmail.com>
Date: Thu, 22 Nov 2012 12:36:35 +0100
Local: Thurs, Nov 22 2012 6:36 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block
Quick note: once you use .defer, that code running in the defer-block
is running on a separate thread. To execute code back on the main
reactor thread, put that code in a EM.next_tick block. Maybe that's
what you're missing, only accessing the AMQP objects on the main
reactor thread.

--
Laust Rud Jacobsen
la...@object.io | blog: object.io | skype:object.io | +45 2588 4665

On 22 November 2012 12:30, PJ <wamre...@googlemail.com> wrote:


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Michael Klishin  
View profile  
 More options Nov 22 2012, 8:17 am
From: Michael Klishin <michael.s.klis...@gmail.com>
Date: Thu, 22 Nov 2012 17:17:03 +0400
Local: Thurs, Nov 22 2012 8:17 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

2012/11/22 PJ <wamre...@googlemail.com>

> It "looks like" (I don't know to be honest) a thread safety issue, but
> I am not sure how to address this.

Not necessarily. It may be an uncaught exception (when connection went
down, AMQP::Exchange#publish raised an exception). Channel synchronize
publishing but it is still
highly recommended not sharing them between threads. See RabbitMQ log,
there has to be
something about why the connection was closed.

--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
PJ  
View profile  
 More options Nov 22 2012, 9:34 am
From: PJ <wamre...@googlemail.com>
Date: Thu, 22 Nov 2012 06:34:11 -0800 (PST)
Local: Thurs, Nov 22 2012 9:34 am
Subject: Re: Publish messages within a subscribe block
Thanks guys.

Laust: thing is my code takes time to execute, so I'd rather have it
executed in a background thread using defer rather than in the main
loop.

Michael: you are right. It looks like messages got interleaved. The
log shows:

connection <0.3418.0> (running), channel 1 - error:
{amqp_error,unexpected_frame,
            "expected content body, got non content body frame
instead",
            'basic.publish'}

I understand what you are saying regarding not sharing channels
between threads, but then I am not sure how to do that if you want to
use defer (considering the block might be executed in any of the
pool's threads). Does it mean one should not publish anything within a
deferred block, and instead find a way to "postpone" it to the
callback?

On Nov 22, 1:17 pm, Michael Klishin <michael.s.klis...@gmail.com>
wrote:


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Michael Klishin  
View profile  
 More options Nov 22 2012, 9:48 am
From: Michael Klishin <michael.s.klis...@gmail.com>
Date: Thu, 22 Nov 2012 18:48:17 +0400
Local: Thurs, Nov 22 2012 9:48 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

2012/11/22 PJ <wamre...@googlemail.com>

> connection <0.3418.0> (running), channel 1 - error:
> {amqp_error,unexpected_frame,
>             "expected content body, got non content body frame
> instead",
>             'basic.publish'}

> I understand what you are saying regarding not sharing channels
> between threads, but then I am not sure how to do that if you want to
> use defer (considering the block might be executed in any of the
> pool's threads).

What version do you use? We added a mutex that synchronizes publishing
months ago. Do you have wireshark installed by any chance? Could you
record a session for me?

> Does it mean one should not publish anything within a
> deferred block, and instead find a way to "postpone" it to the
> callback?

No, that would make things needlessly difficult. I would just open two
channels,
one only used for publishing, so that the publishing channel is in the
scope of what you
are deferring. Simply using separate channels is all you need.

I am very curious under what circumstances with recent versions you are
getting
frames interleaved on the same channel.
--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
François Beausoleil  
View profile  
 More options Nov 22 2012, 2:46 pm
From: François Beausoleil <francois.beausol...@gmail.com>
Date: Thu, 22 Nov 2012 14:42:21 -0500
Local: Thurs, Nov 22 2012 2:42 pm
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

Le 2012-11-22 à 09:34, PJ a écrit :

> Laust: thing is my code takes time to execute, so I'd rather have it
> executed in a background thread using defer rather than in the main
> loop.

What Laust meant is this:

EM.defer do
  # do stuff
  EM.next_tick do
    exchange.publish(...)
  end
end

That way, the publication always happens in the same thread, namely the EventMachine one.

Bye!
François


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
PJ  
View profile  
 More options Nov 28 2012, 6:49 am
From: PJ <wamre...@googlemail.com>
Date: Wed, 28 Nov 2012 03:49:09 -0800 (PST)
Local: Wed, Nov 28 2012 6:49 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

Guys, I meant to apologize it took me a lot of time to get back to you. But
this is proving very challenging for me. And again, just to be clear:
everything is absolutely fine under "normal" use. It only starts to fail
under heavy load.

Thank you François. Indeed, I misunderstood what Laust had said. I am
trying this solution but it still fails from time to time.

Michael, sorry it took me a lot of time to get back to you: it is hard to
reproduce this specific error. It happens very randomly. I am using:
- services side:
amq-client (0.9.9)
amq-protocol (0.9.5)
amqp (0.9.8)
- client side (Rails 3):
bunny (0.8.0)

(unfortunately I can't install wireshark on these boxes at present)

I have now refactored my code to try and combine Laust's and your
suggestions. I am still getting random problems under load though. So just
two questions:

- is it "bad" to nest deferred blocks (I am now making sure all "publish"
actions only happen in the callbacks)?

EventMachine.run do

  operation = proc {
    foo1()
  }
  callback = proc {|result|
    foo2(result)

    operation2 = proc {
      foo3(result)
    }
    callback2 = proc {|result2|
      foo4(result2)
    }
    EventMachine.defer(operation2, callback2)

  }
  EventMachine.defer(operation, callback)

end

- Michael, when you say "I would just open two channels, one only used for
publishing", I am not sure I follow. Are you saying I should do something
like:

  EventMachine.run do
    connection = AMQP.connect(config[:amqp_server])
    read_channel  = AMQP::Channel.new(connection)
    read_ex = read_channel.default_exchange
    write_channel  = AMQP::Channel.new(connection)
    write_ex = write_channel.default_exchange

    ...
  end

and make sure all the "subscribe" are bound to the read_channel, and all
the "publish" to the write_channel? Is it still recommended if I make sure
all the "publish" actions happen in the reactor loop?

Thanks
PJ


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Michael Klishin  
View profile  
 More options Nov 28 2012, 7:39 am
From: Michael Klishin <michael.s.klis...@gmail.com>
Date: Wed, 28 Nov 2012 16:39:10 +0400
Local: Wed, Nov 28 2012 7:39 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

2012/11/28 PJ <wamre...@googlemail.com>

> - Michael, when you say "I would just open two channels, one only used for
> publishing", I am not sure I follow. Are you saying I should do something
> like:

>   EventMachine.run do
>     connection = AMQP.connect(config[:amqp_server])
>     read_channel  = AMQP::Channel.new(connection)
>     read_ex = read_channel.default_exchange
>     write_channel  = AMQP::Channel.new(connection)
>     write_ex = write_channel.default_exchange

>     ...
>   end

Yes.

I am curious if using next Bunny release may be easier for you. It does not
have this problem because consumer handlers
are executed in a separate thread from network activity. See

https://groups.google.com/forum/?fromgroups=#!topic/ruby-amqp/UMiexQt...

--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
PJ  
View profile  
 More options Nov 28 2012, 9:08 am
From: PJ <wamre...@googlemail.com>
Date: Wed, 28 Nov 2012 06:08:13 -0800 (PST)
Local: Wed, Nov 28 2012 9:08 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

Michael,

I will try this. I don't know if it makes any sense, but indeed, it seems
the problems stem from the client side (which uses Bunny), not the services
(which use AMQP). Indeed, I am doing two types of load tests:

a- from within a Rails console (i.e. pure AMQP messaging, no http requests).
b- using Celerity (i.e. "real" browsers and requests)

With a-, I can load the services like crazy and don't get a single issue.
With b-, some of my services start to fail randomly.

On this, is there a recommended way to use Bunny from Rails / Passenger?
Right now, I am doing it very bluntly: every time the client app needs to
send/receive a message, I start a connection, then close it. It means a lot
of connections/disconnections.

PJ


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Michael Klishin  
View profile  
 More options Nov 28 2012, 9:13 am
From: Michael Klishin <michael.s.klis...@gmail.com>
Date: Wed, 28 Nov 2012 18:13:44 +0400
Local: Wed, Nov 28 2012 9:13 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

2012/11/28 PJ <wamre...@googlemail.com>

> On this, is there a recommended way to use Bunny from Rails / Passenger?
> Right now, I am doing it very bluntly: every time the client app needs to
> send/receive a message, I start a connection, then close it. It means a lot
> of connections/disconnections.

Connect in an initializer. Bunny 0.9 does not try to hide channels from you
so you will have to also open
a channel. The same goes for any other application, more or less.

Bunny 0.9 does not currently have network partition handling, but that is
on top of the list.
--
MK

http://github.com/michaelklishin
http://twitter.com/michaelklishin


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
PJ  
View profile  
 More options Nov 28 2012, 11:53 am
From: PJ <wamre...@googlemail.com>
Date: Wed, 28 Nov 2012 08:53:29 -0800 (PST)
Local: Wed, Nov 28 2012 11:53 am
Subject: Re: [ruby-amqp] Re: Publish messages within a subscribe block

Michael, thank you SO much.

I replaced Bunny 0.8.0 with 0.9.0.pre1 and I don't see any errors anymore
under load. Everything seems fine. Even all the "begin / rescue / retry"
blocks I had in my code don't seem necessary anymore.

I will report back if further testing yields errors, but so far it is all
good.

And the test suite is super helpful to understand the new API.

Thanks a million
PJ


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
End of messages
« Back to Discussions « Newer topic     Older topic »