AMQP from Rails, crashing intermittently

110 views
Skip to first unread message

AlexKane

unread,
Jan 29, 2009, 11:39:59 AM1/29/09
to AMQP
I'm almost ready to deploy my AMQP implementation which is running in
a Rails 1.2.3 app. During testing however, we're seeing the mongrels
lock up after a connecting to the queue a few times. Similarly from
the Rails console I'm able to send a few messages but then I get the
following error (the message is sent before_update):

>> p.save
=> true
>> p.save
=> true
>> p.save
=> true
/usr/local/lib/ruby/1.8/irb/input-method.rb:97:in `readline': stdin
closed (IOError)
from /usr/local/lib/ruby/1.8/irb/input-method.rb:97:in `gets'
from /usr/local/lib/ruby/1.8/irb.rb:132:in `eval_input'
from /usr/local/lib/ruby/1.8/irb.rb:263:in `signal_status'
from /usr/local/lib/ruby/1.8/irb.rb:131:in `eval_input'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:189:in `call'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:189:in `buf_input'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:104:in `getc'
from /usr/local/lib/ruby/1.8/irb/slex.rb:206:in `match_io'
... 9 levels...
from /usr/local/lib/ruby/1.8/irb.rb:70:in `start'
from /usr/local/lib/ruby/1.8/irb.rb:69:in `catch'
from /usr/local/lib/ruby/1.8/irb.rb:69:in `start'
from /usr/local/bin/irb:13


There's nothing in the error that points to AMQP, but I don't get the
error when I comment out the AMQP stuff in the following method:

def before_update
require 'amqp'
require 'mq'
message = parature_hash.merge({:message_type =>
'update', :lookup_email => parature_email}).to_yaml
AMQP.logging = false

begin
AMQP.start(:host => "my.message_host") do
publish = MQ.queue("/parature", :durable => true).publish
(message.to_s, :persistent => true)
AMQP.stop{EM.stop}
end
rescue => e
raise "AMQP Error: #{e.message}"
end
true
end

AlexKane

unread,
Jan 29, 2009, 12:28:43 PM1/29/09
to AMQP
I tried wrapping the EM code in a synchronize block and I'm now
getting a deadlock:

=> true
>> Person.find(1234).save
=> true
>> Person.find(1234).save
deadlock 0x21e0710: sleep:F(4) - /usr/local/lib/ruby/1.8/drb/drb.rb:
944
deadlock 0x346fc: sleep:S (main) - /usr/local/lib/ruby/gems/1.8/gems/
wirble-0.1.2/./wirble.rb:418
/usr/local/lib/ruby/1.8/irb.rb:159:in `write': Bad file descriptor
(Errno::EBADF)
from /usr/local/lib/ruby/1.8/irb.rb:159:in `print'
from /usr/local/lib/ruby/1.8/irb.rb:159:in `eval_input'
from /usr/local/lib/ruby/1.8/irb.rb:263:in `signal_status'
from /usr/local/lib/ruby/1.8/irb.rb:147:in `eval_input'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:244:in
`each_top_level_statement'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:230:in `loop'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:230:in
`each_top_level_statement'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:229:in `catch'
from /usr/local/lib/ruby/1.8/irb/ruby-lex.rb:229:in
`each_top_level_statement'
from /usr/local/lib/ruby/1.8/irb.rb:146:in `eval_input'
from /usr/local/lib/ruby/1.8/irb.rb:70:in `start'
from /usr/local/lib/ruby/1.8/irb.rb:69:in `catch'
from /usr/local/lib/ruby/1.8/irb.rb:69:in `start'
from /usr/local/bin/irb:13

My code now looks like:

def before_update
require 'thread'
semaphore = Mutex.new

message = parature_hash.merge({:message_type =>
'update', :lookup_email => parature_email}).to_yaml
AMQP.logging = false

semaphore.synchronize{
AMQP.start(:host => "message_host") do
publish = MQ.queue("/parature", :durable => true).publish
(message.to_s, :persistent => true)
AMQP.stop{EM.stop}
end
}

end

AlexKane

unread,
Jan 29, 2009, 12:48:20 PM1/29/09
to AMQP
I was just reading some older threads on this subject and came across
http://groups.google.com/group/ruby-amqp/browse_thread/thread/be805632e57e4dd2/7489794612a07663?hl=en&lnk=gst&q=rails#7489794612a07663
(Should have done this before posting, sorry).

What's the best way to publish a message from a Rails app that's not
running evented mongrel?

~ Alex

Aman Gupta

unread,
Jan 29, 2009, 1:16:30 PM1/29/09
to ruby...@googlegroups.com
In environment.rb:

Thread.new{ AMQP.start(:host => 'localhost') }

Sent from my iPhone

AlexKane

unread,
Jan 29, 2009, 1:52:40 PM1/29/09
to AMQP
Thanks for the reply from the iPhone, I've got to say support for AMQP
is great :-)

I've added that line to environment.rb, replacing localhost with the
hostname of my broker. Do I now need to modify my client code to use
this thread? Not sure how to do that.

thanks
~ alex

On Jan 29, 1:16 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> In environment.rb:
>
> Thread.new{ AMQP.start(:host => 'localhost') }
>
> Sent from my iPhone
>
> On Jan 29, 2009, at 9:48 AM, AlexKane <alexk...@gmail.com> wrote:
>
>
>
> > I was just reading some older threads on this subject and came across
> >http://groups.google.com/group/ruby-amqp/browse_thread/thread/be80563...

Chuck Remes

unread,
Jan 29, 2009, 2:18:45 PM1/29/09
to ruby...@googlegroups.com
Alex,

you do not need to modify your client code. Any calls to MQ (or AMQP)
will "just work."

cr

Aman Gupta

unread,
Jan 29, 2009, 2:27:26 PM1/29/09
to ruby...@googlegroups.com
Correct. You don't need to worry about the threads, simply change your
model to use MQ directly:

def before_update
MQ.queue("/parature", :durable => true).publish(message.to_s,
:persistent => true)
end

Aman

Doug Barth

unread,
Jan 29, 2009, 3:03:55 PM1/29/09
to AMQP
On Jan 29, 1:27 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> Correct. You don't need to worry about the threads, simply change your
> model to use MQ directly:

I was quite surprised to read about this approach in the comments in
0.6.0 version of amqp. Aman, do you know of any documentation that
talks about this reactor on a background thread approach? What calls
are thread safe and which aren't?

--
Doug Barth

Chuck Remes

unread,
Jan 29, 2009, 3:13:50 PM1/29/09
to ruby...@googlegroups.com

Doug,

I believe this works because all of the MQ calls are really
Deferrables with callbacks. Each deferred block gets pushed onto a
list (non-thread-safe by design in Ruby, but thread-safe by
implementation due to GIL). Therefore, you can push new callbacks onto
the list from any thread and they can be popped off and executed by
the reactor running in another thread.

But then again, I might be wrong. :)

cr

Aman Gupta

unread,
Jan 29, 2009, 6:13:24 PM1/29/09
to ruby...@googlegroups.com
Chuck's explanation is correct, although strictly speaking Deferrable
does not currently invoke blocks in the reactor thread after the
deferrable has succeeded. This could easily be changed, however, by
wrapping those function bodies in an EM.next_tick{}. See
http://rubyeventmachine.com/ticket/95

Personally I try to avoid threads wherever possible, but this
technique actually works quite well and is a good way of using amqp
with mongrel. I recommend keeping epoll disabled when running the
reactor in a thread, because mixing EM.epoll and ruby threads
currently introduces a significant amount of latency.

Aman

AlexKane

unread,
Feb 5, 2009, 1:53:57 PM2/5/09
to AMQP
Just wanted to follow up and say that creating the EM thread in
environment.rb works like a charm. Thanks again guys!

On Jan 29, 6:13 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> Chuck's explanation is correct, although strictly speaking Deferrable
> does not currently invoke blocks in the reactor thread after the
> deferrable has succeeded. This could easily be changed, however, by
> wrapping those function bodies in an EM.next_tick{}. Seehttp://rubyeventmachine.com/ticket/95
>
> Personally I try to avoid threads wherever possible, but this
> technique actually works quite well and is a good way of using amqp
> with mongrel. I recommend keeping epoll disabled when running the
> reactor in a thread, because mixing EM.epoll and ruby threads
> currently introduces a significant amount of latency.
>
>   Aman
>
Reply all
Reply to author
Forward
0 new messages