I'm trying to get amqp to do the right thing when a process forks but I'm
having a few problems. The following code sort of works:
require 'mq'
AMQP.start do
MQ.queue('manage').subscribe { |h,m|
puts "[%s] %s: %s\n" % [Process.pid, :manage, m]
}
EM.fork_reactor {
MQ.queue('test').subscribe { |h,m|
puts "[%s] %s: %s\n" % [Process.pid, :test, m]
}
}
end
The first problem is that warnings are being printed to stderr:
NOT_ALLOWED - attempt to reuse consumer tag 'manage-692880059282' in AMQP::Protocol::Basic::Consume
NOT_ALLOWED - attempt to reuse consumer tag 'manage-385705733414' in AMQP::Protocol::Basic::Consume
I've done a google search on this message but there is less than one
page of results that don't give much insight into the problem.
The error happens in self.release_machine method in fork_reactor. Not
sure if that helps!
The second problem is that if I send a message to the parent process
then it gets delivered to the parent first and then to the client and so
on. If I monkey patch EM.fork_reactor to include the following it works
but it doesn't strike me as the right thing to do:
ObjectSpace.each_object(MQ) { |a| a.queues.each { |name,q| q.delete } }
I've tried numerous other things but I ended up going round in circles
without really knowing what I was doing.
Is this the correct thing to do?
rgh
That gives the same "attempt to reuse consumer" errors. AMQP.fork seems to be
a wrapper around EM.fork that sets Thread.current[:mq] = nil and sets the
@conn object to nil. I've already tried to set the @conn object to nil
but that did work either.
rgh
That's the thing, I'm not. If you paste the attached code into irb it
will produce the errors.
I notice that if I print out the process id along with the consumer
tag the errors have different pids. Also if I dump the object space in
fork_reactor there are a number of AMQP related objects open:
[[AMQP::Buffer, 15],
[AMQP::Frame::Method, 7],
[AMQP::Buffer::Overflow, 2],
[AMQP::Protocol::Connection::StartOk, 1],
[AMQP::Protocol::Connection::Start, 1],
[AMQP::Protocol::Channel::Open, 1],
[AMQP::Protocol::Connection::OpenOk, 1],
[AMQP::Protocol::Connection::Open, 1],
[MQ::Queue, 1],
[AMQP::Protocol::Connection::Tune, 1],
[AMQP::Protocol::Connection::TuneOk, 1],
[MQ, 1]]
The number is the number of objects.
Given that the code will continue to run from the point of fork in both
the child and parent is it possible that messages are being sent after
the fork? Is there anyway to close the connection to rabbitmq? I tried
AMQP.stop but that didn't do anything. Is there anything else I can try?
Using SecureRandom.hex(16) produces exactly the same problem.
rgh