Use within apache/mod_ruby

142 views
Skip to first unread message

dan

unread,
Feb 16, 2009, 1:05:23 PM2/16/09
to AMQP
Hi,

I am trying to use amqp in mod_ruby to place a database lookup into a
queue so that one of a pool of servers can reply. I have just a simple
no-op case and after the first turn (and a successful 1st reply)
apache's IO configuration is left in an unusable state so that all
subsequent hits to a particular worker just close the socket without
any data being written. Oddly enough, the error_log shows that the
request has come and the AMQP round trip was successful, but apache is
unable to write to the anticipated file descriptor that it allocated.
I think it is the result of the stop sequence at the end of the first
hit that does it.

This is my mod_ruby program:

require 'mq'

AMQP.logging=true

require 'singleton'

class AmqpTest
include Singleton

def handler apr
apr.status=::Apache::HTTP_OK
apr.content_type="text/plain"
apr.send_http_header
apr.write("reply:\n")

r=nil

AMQP.start {

amq=MQ.new
ns=0

pool=amq.queue('pool')

rqname='reply-'+MQ.id

reply=amq.queue(rqname, :auto_delete=>true)

$stderr.puts "sending" + tt=(Time.now.to_s)
$stderr.puts("5")
pool.publish(rqname + "|"+ tt)

reply.subscribe {|headers,body|
r="got [#{headers.inspect}] #{body}"
$stderr.puts "got [#{headers.inspect}] #{body}"
AMQP.stop { EM.stop }
}
}

apr.write(r)
apr.write("\n")
::Apache::OK
end
end


the end of an strace of a successful round trip is:

writev(9, [{"HTTP/1.1 200 OK\r\nDate: Mon, 16 F"..., 206}, {"17ac\r
\n", 6}, {"reply:\n[\"connected\"]\n\n[\"receive\""..., 6060}, {"\r
\n", 2}], 4) = 6274
writev(9, [{"0\r\n\r\n", 5}], 1) = 5
read(9, 0x16545098, 8000) = -1 EAGAIN (Resource
temporarily unavailable)
write(8, "10.100.1.79 - - [16/Feb/2009:12:"..., 71) = 71
poll([{fd=9, events=POLLIN, revents=POLLIN}], 1, 5000) = 1
read(9, "", 8000) = 0
gettimeofday({1234807160, 342188}, NULL) = 0
shutdown(9, 1 /* send */) = 0
poll([{fd=9, events=POLLIN, revents=POLLIN|POLLHUP}], 1, 2000) = 1
read(9, "", 512) = 0
close(9) = 0

And this is a bad one:

writev(0, [{"HTTP/1.1 200 OK\r\nDate: Mon, 16 F"..., 206}, {"17ac\r
\n", 6}, {"reply:\n[\"connected\"]\n\n[\"receive\""..., 6060}, {"\r
\n", 2}], 4) = -1 EBADF (Bad file descriptor)
writev(0, [{"0\r\n\r\n", 5}], 1) = -1 EBADF (Bad file
descriptor)
read(0, 0x166e7c48, 8000) = -1 EBADF (Bad file
descriptor)
write(8, "10.100.1.79 - - [16/Feb/2009:13:"..., 71) = 71
close(0) = -1 EBADF (Bad file
descriptor)

Aman Gupta

unread,
Feb 21, 2009, 2:06:02 AM2/21/09
to AMQP
Very strange. I haven't used passenger yet and am not sure if it plays
well with EM. What happens if you try running EM in a thread?

Thread.new{
EM.run{}
}

How do you hook up AmqpTest to mod_ruby?

Aman

stephenjudkins

unread,
Feb 27, 2009, 8:14:21 PM2/27/09
to AMQP
Passenger seems to be playing great with AMQP. There were a couple
issues I had to resolve to get it working. Here's the code that
allows EventMachine/AMQP to start and stop gracefully:

current = Thread.current
Thread.new do
AMQP.start(:host => AMQP_HOST) do
current.wakeup
end
end
Thread.stop

# catch these, stop AMQP, stop eventmachine, and re-throw to
Mongrel/Passenger's signal traps
EM.run do
['INT', 'TERM'].each do |sig|
old = trap(sig) do
AMQP.stop do
EM.stop
old.call
end
end
end
end

This resolved two issues: the 'MQ can only be used from within EM.run
{}' messages that were occasionally thrown when a message was sent
before EM was up and running in its own thread, and strangeness where
Ruby worker processes wouldn't die.

Matt Murphy

unread,
Apr 8, 2009, 2:30:13 AM4/8/09
to AMQP
Hmm... I tried this and it didn't work on my setup... AMQP tries
repeatedly and fails to open a connection. Anyone have any ideas?
(output below):

If I don't use Stephen's threads trick AMQP connects fine (but the app
only serves 5 or 6 requests b/c all of the rails processes hang...)

"send",
#<AMQP::Frame::Method:0x657b010
@channel=1,
@payload=
#<AMQP::Protocol::Channel::Open:0x657b3d0 @debug=1,
@out_of_band=nil>>]

["send",
#<AMQP::Frame::Method:0x657a958
@channel=1,
@payload=
#<AMQP::Protocol::Channel::Open:0x657ad18 @debug=1,
@out_of_band=nil>>]

etc...

Aman Gupta

unread,
Apr 8, 2009, 10:35:36 AM4/8/09
to ruby...@googlegroups.com
Sounds like it's repeatedly opening new channels.. every MQ.new or MQ
in a new thread does this. I would recommend creating one global
channel and using that everywhere:

environment.rb:

$mq = MQ.new

controller.rb:

$mq.queue(...).publish(...)

Sent from my iPhone

Matt Murphy

unread,
Apr 8, 2009, 1:33:46 PM4/8/09
to ruby...@googlegroups.com
Aman --

I am now using the global MQ as you suggested.  Now the log shows Publish, Header, and Body messages being passed, seemingly correctly... BUT

0n the server running rabbitmq rabbitmqctl list_queues shows no messages are being added to the queue.

Oddly when I just start the app without the separate thread about 6 new messages do get added to the queue before the app hangs (probably passenger is creating six porcesses)...

Just curious if you have any suggestions for further debugging.

dan.simpson

unread,
Apr 9, 2009, 4:17:59 PM4/9/09
to AMQP
We are experiencing the same issue. We can't seem to get actions from
rails to send messages to our topic exchange. However, if we call the
exact same "action" from script/console, it works. In other words,
passenger and AMQP or EM are not happy.

We are running the latest version of passenger and rails 2.3.2.

We are initializing AMQP as shown above.

Thanks,
Dan

On Apr 8, 10:33 am, Matt Murphy <mmm...@gmail.com> wrote:
> Aman --
>
> I am now using the global MQ as you suggested.  Now the log shows Publish,
> Header, and Body messages being passed, seemingly correctly... BUT
>
> 0n the server running rabbitmq rabbitmqctl list_queues shows no messages are
> being added to the queue.
>
> Oddly when I just start the app without the separate thread about 6 new
> messages do get added to the queue before the app hangs (probably passenger
> is creating six porcesses)...
>
> Just curious if you have any suggestions for further debugging.
>

Matt Todd

unread,
Apr 9, 2009, 5:35:00 PM4/9/09
to AMQP
I encountered this when writing a plugin that required a background
Thread and had the plugin deployed in Passenger.

Passenger is unique how it is set up, but also how it creates its
child processes; it will initially run your config/boot.rb and config/
environment.rb in one thread, then it will fork the process and begin
handling requests from the new child process.

This means that background threads (in Ruby, and like the ones in
EventMachine, I'd imagine) are left behind in the old process.
Effectively, the reactor is left running but is in another process.

Based on my previous solution, you'll want to record the PID of the
background Thread and save the thread as $emt or something; then
before you queue, you check $emt[:pid] == $$ and if not, start up the
reactor again... obviously, you'd want to make this a little cleaner.

The first bit of my own solution can be found here:
http://github.com/highgroove/scout_rails_instrumentation/tree/master/lib/scout.rb#L101-103

Hope this helps!

Matt



Matt Todd
Highgroove Studios
www.highgroove.com
cell: 404-314-2612
blog: maraby.org
Scout - Web Monitoring and Reporting Software
www.scoutapp.com

Aman Gupta

unread,
Apr 10, 2009, 6:12:16 PM4/10/09
to AMQP
If all you're doing from your rails app is publishing messages, you
don't necessarily need EM or the async API and might want to look at
Amos' new synchronous AMQP client: http://github.com/famoseagle/carrot

To get EM working with Passenger, it sounds like EM needs to be
started after the fork. One way to do this would be to move the
initialization code out of environment.rb and have it occur lazily
within the controller itself:

unless @@amqp_connected
@@amqp_connected = true
th = Thread.current
Thread.new{ AMQP.start(...){ th.wakeup; $mq = MQ.new } }
Thread.stop
end

$mq.queue('name').publish('msg')

Aman

On Apr 9, 2:35 pm, Matt Todd <chiol...@gmail.com> wrote:
> I encountered this when writing a plugin that required a background
> Thread and had the plugin deployed in Passenger.
>
> Passenger is unique how it is set up, but also how it creates its
> child processes; it will initially run your config/boot.rb and config/
> environment.rb in one thread, then it will fork the process and begin
> handling requests from the new child process.
>
> This means that background threads (in Ruby, and like the ones in
> EventMachine, I'd imagine) are left behind in the old process.
> Effectively, the reactor is left running but is in another process.
>
> Based on my previous solution, you'll want to record the PID of the
> background Thread and save the thread as $emt or something; then
> before you queue, you check $emt[:pid] == $$ and if not, start up the
> reactor again... obviously, you'd want to make this a little cleaner.
>
> The first bit of my own solution can be found here:http://github.com/highgroove/scout_rails_instrumentation/tree/master/...

amos

unread,
Apr 13, 2009, 2:14:32 AM4/13/09
to AMQP
I've been using my sync client in production for a few days now and
haven't had many problems. I'm processing about 2 million messages a
day and we're successfully publishing from Passenger.

Some caveats: I've only tested publish and pop in production so if
you're trying to do something a bit more fancy, you may run into
issues. Also, I'm ignoring rabbit's flow controls which is generally a
bad idea. If I'm not mistaken so is Aman. (Aman, please correct me if
I'm wrong.)

I would like to add threads to handle flow and subscribe to multiple
queues, but I may not have time since my wife is almost 9 months
pregnant. If anyone wants to contribute, let me know.

- Amos

P.S. Most of my lib is just a non-eventmachine wrapper around amqp.
Aman did most of the heavy lifting.

amos

unread,
Apr 13, 2009, 2:26:17 AM4/13/09
to AMQP
If you're using amqp with workling you may want to check out
sweat_shop which uses carrot:

http://github.com/famoseagle/sweat_shop/tree/master

dan.simpson

unread,
Apr 9, 2009, 4:57:59 PM4/9/09
to AMQP
We are having the same problem. We have tried to globalize the object
as well. We are able to get it working with mongrel, or even by
executing the method in script/console. So, we believe this problem
is specific to passenger. We are running passenger 2.1.2 and rails
2.3.2. It's difficult for us to determine what is causing the
problem, but it seems that the passenger rails instances are failing
to send the message. We are going to try a few things to help us
determine what is going on, but right now it's a show stopper for the
project.

--Dan

On Apr 8, 10:33 am, Matt Murphy <mmm...@gmail.com> wrote:
> Aman --
>
> I am now using the global MQ as you suggested.  Now the log shows Publish,
> Header, and Body messages being passed, seemingly correctly... BUT
>
> 0n the server running rabbitmq rabbitmqctl list_queues shows no messages are
> being added to the queue.
>
> Oddly when I just start the app without the separate thread about 6 new
> messages do get added to the queue before the app hangs (probably passenger
> is creating six porcesses)...
>
> Just curious if you have any suggestions for further debugging.
>

Aman Gupta

unread,
Apr 18, 2009, 6:47:48 PM4/18/09
to AMQP
The correct way to run EM or AMQP within passenger is using the post
fork hook they provide (http://www.modrails.com/documentation/Users
%20guide.html#_smart_spawning_gotcha_2_the_need_to_revive_threads). In
your environment.rb or config.ru:

require 'mq'
if defined?(PhusionPassenger)
PhusionPassenger.on_event(:starting_worker_process) do |forked|
if forked
if EM.reactor_running?
EM.stop_event_loop
EM.release_machine
EM.instance_variable_set( '@reactor_running', false )
end
Thread.current[:mq] = nil
AMQP.instance_variable_set('@conn', nil)
end

th = Thread.current
Thread.new{
AMQP.connect(:host => 'localhost'){
th.wakeup
}
}
Thread.stop
end
end

Then you should be able to use MQ.default from your controllers:
MQ.queue('name').publish('msg').

Aman

Matt Murphy

unread,
Apr 20, 2009, 3:14:31 PM4/20/09
to ruby...@googlegroups.com
Thanks much for posting this, Aman!
Reply all
Reply to author
Forward
0 new messages