Why is my queue acting like a LIFO? I want FIFO!

1,293 views
Skip to first unread message

Henry

unread,
Oct 5, 2010, 2:17:44 PM10/5/10
to AMQP
I have a pretty basic set up with one exchange and one queue:

AMQP.start(:user => env_config["user"],:pass =>
env_config["pass"], :host => env_config["host"], :vhost =>
env_config["vhost"]) do


queue = MQ.queue(env_config['queue'])
exchange = MQ.topic(env_config['topic'])
queue.bind(exchange, :key => env_config['listen_key'])

queue.subscribe do |header, body|

..... processing stuff here....

end


end

It seems that when we have a long running (blocking) process (we are
not using EM.defer because we use jruby), and messages get backed up
on the queue, that once the long running processes is finished, the
subsequent messages that it processes are the newer messages rather
than the older messages that should have come in first. Isn't the
default queue supposed to be FIFO?

Aman Gupta

unread,
Oct 5, 2010, 2:52:42 PM10/5/10
to ruby...@googlegroups.com
RabbitMQ and amqp make no guarantees about the ordering of messages on a queue.

  Aman


--
You received this message because you are subscribed to the Google Groups "AMQP" group.
To post to this group, send email to ruby...@googlegroups.com.
To unsubscribe from this group, send email to ruby-amqp+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/ruby-amqp?hl=en.


Homan Chou

unread,
Oct 6, 2010, 1:27:27 PM10/6/10
to ruby...@googlegroups.com
Really? So is there no way to emulate FIFO behavior?

Vlad Tepes

unread,
Oct 6, 2010, 2:00:56 PM10/6/10
to ruby...@googlegroups.com
Does it help if you set prefetch to 1 & require acks? I have a completely unsubstantiated theory that you might be temporarily "losing" messages in a buffer, then receiving them after later messages that were not "lost" in the buffer.

Just a thought...I appear to get FIFO behavior all the time except for messages that are sent a second time due to a lack of "ack", but I can't swear to my notion that this is what's happening for you.

Homan Chou

unread,
Oct 6, 2010, 2:43:37 PM10/6/10
to ruby...@googlegroups.com
Interesting theory. I'll give it a try. Off the top of your head do
you have an example or know the syntax for requiring an ack or setting
up prefetch? I've just been doing the plain
@exchange.publish("message", :routing_key => ...) business from some
examples I found.

Thanks.

Vlad Tepes

unread,
Oct 6, 2010, 2:58:37 PM10/6/10
to ruby...@googlegroups.com
The publisher side shouldn't change, but here's a heavily-redacted example of consumer code...I didn't actually run it, because it doesn't do anything, but it should show you the basic syntax. Choice of direct/topic/fanout exchange shouldn't affect the syntax. My actual code uses durable queues & persistent messages, but I doubt it matters either.

require 'mq'

AMQP.start do
  MQ.prefetch 1
  queue = MQ.queue('test-queue')
  queue.bind(MQ.fanout('test-exchange')).subscribe(:ack=>true) do |head,msg|
    # do stuff
    head.ack
  end
end

Homan Chou

unread,
Oct 8, 2010, 6:15:15 PM10/8/10
to ruby...@googlegroups.com
I tried to reproduce the LIFO behavior using a simpler producer and
subscriber with a 10 sec sleep time on the subscriber while the
producer produces messages every second, and low and behold I can't
exhibit anything but FIFO using this test, (which leads me to believe
my problem is probably something in the production code... so it'll
take me a while to figure out. Thanks for all the help.
Reply all
Reply to author
Forward
0 new messages