How to publish immediately to queue inside long-running process

160 views
Skip to first unread message

simianarmy

unread,
Aug 5, 2009, 4:15:04 PM8/5/09
to AMQP
Hello, sorry in advance about this simple question, but I have tried
just about everything over 3 days to reach this point:
- Switched from rabbitmq 1.5 to 1.6

Some history: I have used amqp + eventmachine quite succesfully in
many situations without (too many) problems. I have multiple
independent processes & daemons (Rails, DaemonKit, cron) etc. that all
communicate over different amqp exchange types.

But one seemingly simple problem I cannot figure out. Some of my jobs
require very long processing times, publishing tens of thousands of
messages to a queue, to be handled by separate daemons/processes,
pretty standard. But no matter what I try inside the long running
daemon, all the messages that I publish don't get sent or are not seen
by the subscriber process until amqp.stop { em.stop } is called by the
publisher.

I have read in a few places that this is normal behavior for amqp.
Yet in all the examples I have seen, all publish jobs that are sent
immediately to subscribers...are these examples too simplistic in that
they all publishers end quickly (calling em.stop) or that they all
have em.add_timer & em.add_periodic_timer?

I can see that while the publisher is running, rabbitmqctl doesn't
show any messages in the queue. When it ends, they all get sent at
once to the subscriber (which uses :ack => true).

Can you tell me if I am wasting my time trying to send messages that
can be processed in parallel by a subscriber without calling em.stop
in the publisher process?

If so, what is the appropriate way to send them? I have tried
add_timer { ... } but this didn't work. I also tried the prefetch(1)
method in the subscriber...didn't work.

Since the publisher is a daemon running em, should I setup periods
where I stop EM and restart?

So confused...would love some clarification on this strange behavior.

Here's my daemon:
amqp.start {
... publish 10,000 jobs
}

Here's my subscriber process:
amqp.start {
queue.subscribe {
....
}
}

Daniel DeLeo

unread,
Aug 5, 2009, 4:25:48 PM8/5/09
to ruby...@googlegroups.com
I'm still learning the ins and outs of EventMachine, so I hope someone will correct or clarify if I'm wrong, but to my understanding, EventMachine needs its thread (so your app if you only have 1 thread) to be in something like an IO wait state before it does its network IO. If your processes are all working at maximum capacity, EventMachine keeps deferring IO, giving the behavior you see. Unfortunately, you can't use sleep() to force EM to take priority, either.

Dan DeLeo

simianarmy

unread,
Aug 5, 2009, 10:18:58 PM8/5/09
to AMQP
Yeah, doesn't look possible. I tried a few things, but my knowledge
of EM is limited.
I tried EM.defer, EM.spawn, EM.add_timer, even adding to
EM.message::queue & running a
queue pop block (also inside a periodic_timer)...but unless my test
wrappers are to blame,
the messages won't get sent till that em.stop is called...

Oh well, nothing to do but make the exchange durable in case the
subscriber dies while processing the flood of incoming jobs.

Aman Gupta

unread,
Aug 5, 2009, 11:30:16 PM8/5/09
to ruby...@googlegroups.com
What are you doing that's causing the reactor to block? Can you give an example of one of your long running jobs?

  Aman

Marc Mauger

unread,
Aug 6, 2009, 1:20:39 AM8/6/09
to ruby...@googlegroups.com
I used simple test scripts to try to limit the outside factors, I would have used gist but having problems with them too :)

Publisher script:

require 'rubygems'
require 'mq'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

AMQP.start do
        10.times do |count|
          puts "step #{count}"
          id = rand(10)
          puts "Publishing id = #{id}"
          MQ.new.queue('pubsub').publish(id)
          sleep(1)
        end
        puts "Finished loop, waiting for stop"
      end

Subscriber script:

require 'rubygems'
require 'mq'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

AMQP.start do
  MQ.new.queue('pubsub').subscribe(:ack => true) do |head, msg|
    puts "got #{msg.inspect}"
    head.ack
  end
end

As simple as I could make it - I run each in a console window to watch output.  Publisher outputs "Publishing " text 1 / second. 
Subscriber dumps "got <id>" strings only after publisher reaches the end of its block.


Regards,
Marc Mauger




Aman Gupta

unread,
Aug 6, 2009, 1:27:59 AM8/6/09
to ruby...@googlegroups.com
On Wed, Aug 5, 2009 at 10:20 PM, Marc Mauger <simia...@gmail.com> wrote:
I used simple test scripts to try to limit the outside factors, I would have used gist but having problems with them too :)

Publisher script:

require 'rubygems'
require 'mq'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

AMQP.start do
        10.times do |count|
          puts "step #{count}"
          id = rand(10)
          puts "Publishing id = #{id}"
 
          MQ.new.queue('pubsub').publish(id)
          sleep(1) 

This is really bad. Calling MQ.new will create a new channel each time and cause erlang process leaks on the server. Also calling sleep(1) blocks the reactor, so it doesn't actually get a chance to send out the packets.

What are you actually doing instead of the sleep(1)? Ideally you would not block, and do everything via the reactor (instead of sleep(1) use EM.add_timer, etc). Alternatively, you can use a Thread for publishing, which is run in parallel with the reactor and not cause it to block.

  Aman

Aman Gupta

unread,
Aug 6, 2009, 1:29:44 AM8/6/09
to ruby...@googlegroups.com
On Wed, Aug 5, 2009 at 10:27 PM, Aman Gupta <themast...@gmail.com> wrote:


On Wed, Aug 5, 2009 at 10:20 PM, Marc Mauger <simia...@gmail.com> wrote:
I used simple test scripts to try to limit the outside factors, I would have used gist but having problems with them too :)

Publisher script:

require 'rubygems'
require 'mq'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

AMQP.start do
        10.times do |count|
          puts "step #{count}"
          id = rand(10)
          puts "Publishing id = #{id}"
 
          MQ.new.queue('pubsub').publish(id)
          sleep(1) 

This is really bad. Calling MQ.new will create a new channel each time and cause erlang process leaks on the server. Also calling sleep(1) blocks the reactor, so it doesn't actually get a chance to send out the packets.

What are you actually doing instead of the sleep(1)? Ideally you would not block, and do everything via the reactor (instead of sleep(1) use EM.add_timer, etc). Alternatively, you can use a Thread for publishing, which is run in parallel with the reactor and not cause it to block.

You can also use http://github.com/famoseagle/carrot or http://github.com/celldee/bunny for publishing messages.

  Aman

Marc Mauger

unread,
Aug 6, 2009, 1:40:01 AM8/6/09
to ruby...@googlegroups.com
Hey Aman,

Actually this is just testing code that I am trying to keep as simple as possible- my code uses a memoized function to return the queue for 
both publisher & subscriber.
Also, my working code doesn't have sleep, but I put it in this test so I could watch the output in my consoles without needing to add a 1000.times loop.

Otherwise, my code is downloading emails from an imap server using the the ruby imap library, creating db records with the email info, then publishing the new record id in a loop (some loops have 10K emails).

In my integration tests that download a handful of emails, I tried wrapping the publish call in Thread.new...but this didn't cause the subscriber to pick them up till the amqp block ended, as in this test.

Sorry just didn't want to publish my rather large real example :)
Regards,
Marc Mauger




Marc Mauger

unread,
Aug 6, 2009, 1:43:27 AM8/6/09
to ruby...@googlegroups.com
Carrot sounds excellent for my purposes, though - I will certainly give it a try. Thanks!

Regards,
Marc Mauger




Aman Gupta

unread,
Aug 6, 2009, 1:49:45 AM8/6/09
to ruby...@googlegroups.com
On Wed, Aug 5, 2009 at 10:40 PM, Marc Mauger <simia...@gmail.com> wrote:
Hey Aman,

Actually this is just testing code that I am trying to keep as simple as possible- my code uses a memoized function to return the queue for 
both publisher & subscriber.
Also, my working code doesn't have sleep, but I put it in this test so I could watch the output in my consoles without needing to add a 1000.times loop.

Yes, calling 1000.times{} will also cause this issue since it will block the reactor. To work around this you need to manually break up those iterations over multiple reactor ticks. Future versions of EM will contain EM::Iterator to make this easier, but usually this pattern looks like:

items = (0..1000).to_a
handle_item = proc{
  if i = items.shift
    process(i)
    EM.next_tick(handle_item)
  end
}

This basically processes one item from the array per reactor tick.
 

Otherwise, my code is downloading emails from an imap server using the the ruby imap library, creating db records with the email info, then publishing the new record id in a loop (some loops have 10K emails).

The ruby imap library using blocking i/o. Ideally you would use a non-blocking imap library with EM, but you'd have to write it first and then rewrite all your code to be asynchronous =\
 

In my integration tests that download a handful of emails, I tried wrapping the publish call in Thread.new...but this didn't cause the subscriber to pick them up till the amqp block ended, as in this test.

You can't just call publish from a new thread, all the blocking behavior should happen in the thread:

AMQP.start do
  mq = MQ.new

  Thread.new do
    1000.times do |i|
       mq.publish(i)
       sleep(1)
    end
  end

end 

This should work, but really you're better off just using a synchronous library like carrot or bunny.

  aman

Marc Mauger

unread,
Aug 6, 2009, 2:00:25 AM8/6/09
to ruby...@googlegroups.com
Thanks a lot for the clarifications Aman I appreciate it - I will think you're right, it probably makes more sense to try the synchronous approach for these long running (& blocking) loops.  I already had to use SystemTimer to handle timeouts in the imap interactions, I don't think I want to add ruby Threads in there as well...above my pay grade..

Thanks again, it is nice to know there are options!

Regards,
Marc Mauger




simianarmy

unread,
Aug 12, 2009, 7:14:37 PM8/12/09
to AMQP
Tried the Thread solution, works beautifully! Finally some async in
my mq world!

On Aug 5, 10:49 pm, Aman Gupta <themastermi...@gmail.com> wrote:
Reply all
Reply to author
Forward
0 new messages