I used simple test scripts to try to limit the outside factors, I would have used gist but having problems with them too :)Publisher script:require 'rubygems'require 'mq'Signal.trap('INT') { AMQP.stop{ EM.stop } }Signal.trap('TERM'){ AMQP.stop{ EM.stop } }AMQP.start do10.times do |count|puts "step #{count}"id = rand(10)puts "Publishing id = #{id}"
MQ.new.queue('pubsub').publish(id)sleep(1)
On Wed, Aug 5, 2009 at 10:20 PM, Marc Mauger <simia...@gmail.com> wrote:I used simple test scripts to try to limit the outside factors, I would have used gist but having problems with them too :)Publisher script:require 'rubygems'require 'mq'Signal.trap('INT') { AMQP.stop{ EM.stop } }Signal.trap('TERM'){ AMQP.stop{ EM.stop } }AMQP.start do10.times do |count|puts "step #{count}"id = rand(10)puts "Publishing id = #{id}"MQ.new.queue('pubsub').publish(id)sleep(1)This is really bad. Calling MQ.new will create a new channel each time and cause erlang process leaks on the server. Also calling sleep(1) blocks the reactor, so it doesn't actually get a chance to send out the packets.What are you actually doing instead of the sleep(1)? Ideally you would not block, and do everything via the reactor (instead of sleep(1) use EM.add_timer, etc). Alternatively, you can use a Thread for publishing, which is run in parallel with the reactor and not cause it to block.
Hey Aman,Actually this is just testing code that I am trying to keep as simple as possible- my code uses a memoized function to return the queue forboth publisher & subscriber.Also, my working code doesn't have sleep, but I put it in this test so I could watch the output in my consoles without needing to add a 1000.times loop.
Otherwise, my code is downloading emails from an imap server using the the ruby imap library, creating db records with the email info, then publishing the new record id in a loop (some loops have 10K emails).
In my integration tests that download a handful of emails, I tried wrapping the publish call in Thread.new...but this didn't cause the subscriber to pick them up till the amqp block ended, as in this test.