Forking

59 views
Skip to first unread message

Richard Heycock

unread,
Oct 30, 2009, 9:56:00 PM10/30/09
to AMQP
Hi,

I'm trying to get amqp to do the right thing when a process forks but I'm
having a few problems. The following code sort of works:

require 'mq'

AMQP.start do

MQ.queue('manage').subscribe { |h,m|
puts "[%s] %s: %s\n" % [Process.pid, :manage, m]
}

EM.fork_reactor {
MQ.queue('test').subscribe { |h,m|
puts "[%s] %s: %s\n" % [Process.pid, :test, m]
}
}
end

The first problem is that warnings are being printed to stderr:

NOT_ALLOWED - attempt to reuse consumer tag 'manage-692880059282' in AMQP::Protocol::Basic::Consume
NOT_ALLOWED - attempt to reuse consumer tag 'manage-385705733414' in AMQP::Protocol::Basic::Consume

I've done a google search on this message but there is less than one
page of results that don't give much insight into the problem.

The error happens in self.release_machine method in fork_reactor. Not
sure if that helps!


The second problem is that if I send a message to the parent process
then it gets delivered to the parent first and then to the client and so
on. If I monkey patch EM.fork_reactor to include the following it works
but it doesn't strike me as the right thing to do:

ObjectSpace.each_object(MQ) { |a| a.queues.each { |name,q| q.delete } }

I've tried numerous other things but I ended up going round in circles
without really knowing what I was doing.

Is this the correct thing to do?

rgh

Aman Gupta

unread,
Oct 30, 2009, 9:57:04 PM10/30/09
to ruby...@googlegroups.com
IIRC, there's an AMQP.fork that cleans up some of the state and calls
EM.fork_reactor.

Aman

Richard Heycock

unread,
Oct 31, 2009, 1:28:20 AM10/31/09
to ruby-amqp
Excerpts from Aman Gupta's message of Sat Oct 31 12:57:04 +1100 2009:

>
> IIRC, there's an AMQP.fork that cleans up some of the state and calls
> EM.fork_reactor.

That gives the same "attempt to reuse consumer" errors. AMQP.fork seems to be
a wrapper around EM.fork that sets Thread.current[:mq] = nil and sets the
@conn object to nil. I've already tried to set the @conn object to nil
but that did work either.

rgh

Aman Gupta

unread,
Oct 31, 2009, 1:39:00 AM10/31/09
to ruby...@googlegroups.com
Sounds like the following piece of code:

def subscribe opts = {}, &blk
@consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"

in mq/queue.rb is the problem. You must be running many thousands of
subscribers to be hitting conflicts? Either that or subscribers are
getting leaked somehow.

Aman

Richard Heycock

unread,
Nov 4, 2009, 3:18:08 AM11/4/09
to ruby-amqp
Excerpts from Aman Gupta's message of Sat Oct 31 16:39:00 +1100 2009:

>
> Sounds like the following piece of code:
>
> def subscribe opts = {}, &blk
> @consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"
>
> in mq/queue.rb is the problem. You must be running many thousands of
> subscribers to be hitting conflicts? Either that or subscribers are
> getting leaked somehow.

That's the thing, I'm not. If you paste the attached code into irb it
will produce the errors.

I notice that if I print out the process id along with the consumer
tag the errors have different pids. Also if I dump the object space in
fork_reactor there are a number of AMQP related objects open:

[[AMQP::Buffer, 15],
[AMQP::Frame::Method, 7],
[AMQP::Buffer::Overflow, 2],
[AMQP::Protocol::Connection::StartOk, 1],
[AMQP::Protocol::Connection::Start, 1],
[AMQP::Protocol::Channel::Open, 1],
[AMQP::Protocol::Connection::OpenOk, 1],
[AMQP::Protocol::Connection::Open, 1],
[MQ::Queue, 1],
[AMQP::Protocol::Connection::Tune, 1],
[AMQP::Protocol::Connection::TuneOk, 1],
[MQ, 1]]

The number is the number of objects.

Given that the code will continue to run from the point of fork in both
the child and parent is it possible that messages are being sent after
the fork? Is there anyway to close the connection to rabbitmq? I tried
AMQP.stop but that didn't do anything. Is there anything else I can try?

Aman Gupta

unread,
Nov 4, 2009, 3:55:26 AM11/4/09
to ruby...@googlegroups.com
Hmm. The reactor shuts down when the fork occurs so all open
connections are shut down. Perhaps the fork is generating the same
random numbers? You could try replacing the random number with a call
to SecureRandom.hex(16)

Aman

Richard Heycock

unread,
Nov 5, 2009, 2:21:39 PM11/5/09
to ruby-amqp
Excerpts from Aman Gupta's message of Wed Nov 04 19:55:26 +1100 2009:

Using SecureRandom.hex(16) produces exactly the same problem.

rgh

Reply all
Reply to author
Forward
0 new messages