i have a problem using this library. just using 2 simple ruby
processes:
producer:
require 'rubygems'
require 'mq'
EM.run do
q = MQ.new
1000.times do
q.queue('test').publish('foo')
end
end
consumer:
require 'rubygems'
require 'mq'
EM.run do
q = MQ.new
q.queue('test').subscribe do |msg|
puts msg
sleep 1
end
end
i see that as soon as the consumer starts, it takes all 1000 messages
off the queue. if i ctrl-c out of that process, the messages are gone
from rabbit. if i try to use an ack version:
require 'rubygems'
require 'mq'
EM.run do
q = MQ.new
q.queue('test').subscribe(:ack => true) do |header, msg|
puts msg
header.ack
sleep 1
end
end
the messages never get acked (after a ctrl-c) and stay on the queue
between restarts. i'm using amqp-0.6 and rabbit 1.5.0.
When you subscribe, rabbit start sending you messages off of the queue
until the TCP buffer fills up.. since this buffer is pretty big
(especially compared to the size of each message), most of the
messages end up getting dequeued and send to the consumer. When you
ctrl+c and aren't used ack mode, these messages are effectively lost.
The solution is to use ack mode, or to use MQ.pop so you explicitly
ask for a message to be dequeued.
1.5 also contains some new flow control features specifically for this
problem. They should be trivial to add to the library as soon as I
figure out how to use them.
On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
> i have a problem using this library. just using 2 simple ruby
> processes:
> producer:
> require 'rubygems'
> require 'mq'
> EM.run do
> q = MQ.new
> 1000.times do
> q.queue('test').publish('foo')
> end
> end
> consumer:
> require 'rubygems'
> require 'mq'
> EM.run do
> q = MQ.new
> q.queue('test').subscribe do |msg|
> puts msg
> sleep 1
> end
> end
> i see that as soon as the consumer starts, it takes all 1000 messages
> off the queue. if i ctrl-c out of that process, the messages are gone
> from rabbit. if i try to use an ack version:
> require 'rubygems'
> require 'mq'
> EM.run do
> q = MQ.new
> q.queue('test').subscribe(:ack => true) do |header, msg|
> puts msg
> header.ack
> sleep 1
> end
> end
> the messages never get acked (after a ctrl-c) and stay on the queue
> between restarts. i'm using amqp-0.6 and rabbit 1.5.0.
On Thu, Feb 12, 2009 at 3:32 AM, Aman Gupta <themastermi...@gmail.com> wrote: > 1.5 also contains some new flow control features specifically for this > problem. They should be trivial to add to the library as soon as I > figure out how to use them.
You might be interested in the basic.qos command that allows a consumer to set it's own prefetch window size.
On Thu, Feb 12, 2009 at 12:31 AM, Ben Hood <0x6e6...@gmail.com> wrote:
> Aman,
> On Thu, Feb 12, 2009 at 3:32 AM, Aman Gupta <themastermi...@gmail.com> wrote:
>> 1.5 also contains some new flow control features specifically for this
>> problem. They should be trivial to add to the library as soon as I
>> figure out how to use them.
> You might be interested in the basic.qos command that allows a
> consumer to set it's own prefetch window size.
try running the ack mode. if you ctrl+c after a couple of lines of
'foo', you'll still see 1000 messages on the queue. is that because
the ack isn't synchronous? if you let the program run all the way to
then end, then the queue will be down to 0.
i was having trouble with pop(). how would you rewrite the consumer to
pop all 1000 messages?
On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> When you subscribe, rabbit start sending you messages off of the queue
> until the TCP buffer fills up.. since this buffer is pretty big
> (especially compared to the size of each message), most of the
> messages end up getting dequeued and send to the consumer. When you
> ctrl+c and aren't used ack mode, these messages are effectively lost.
> The solution is to use ack mode, or to use MQ.pop so you explicitly
> ask for a message to be dequeued.
> 1.5 also contains some new flow control features specifically for this
> problem. They should be trivial to add to the library as soon as I
> figure out how to use them.
> Aman
> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
> > i have a problem using this library. just using 2 simple ruby
> > processes:
> > producer:
> > require 'rubygems'
> > require 'mq'
> > EM.run do
> > q = MQ.new
> > 1000.times do
> > q.queue('test').publish('foo')
> > end
> > end
> > consumer:
> > require 'rubygems'
> > require 'mq'
> > EM.run do
> > q = MQ.new
> > q.queue('test').subscribe do |msg|
> > puts msg
> > sleep 1
> > end
> > end
> > i see that as soon as the consumer starts, it takes all 1000 messages
> > off the queue. if i ctrl-c out of that process, the messages are gone
> > from rabbit. if i try to use an ack version:
> > require 'rubygems'
> > require 'mq'
> > EM.run do
> > q = MQ.new
> > q.queue('test').subscribe(:ack => true) do |header, msg|
> > puts msg
> > header.ack
> > sleep 1
> > end
> > end
> > the messages never get acked (after a ctrl-c) and stay on the queue
> > between restarts. i'm using amqp-0.6 and rabbit 1.5.0.
You're using sleep(1) in your ack example, which blocks the reactor
and doesn't give the acks a chance to be sent over the network. Here's
an example using pop:
queue = MQ.new.queue('name')
queue.pop{ |msg|
# if msg is nil, the queue was empty
if msg
process(msg)
end
# wait two seconds and pop another message off
EM.add_timer(2){ queue.pop }
On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
> try running the ack mode. if you ctrl+c after a couple of lines of
> 'foo', you'll still see 1000 messages on the queue. is that because
> the ack isn't synchronous? if you let the program run all the way to
> then end, then the queue will be down to 0.
> i was having trouble with pop(). how would you rewrite the consumer to
> pop all 1000 messages?
> On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> The ack version works as expected, correct?
>> When you subscribe, rabbit start sending you messages off of the queue
>> until the TCP buffer fills up.. since this buffer is pretty big
>> (especially compared to the size of each message), most of the
>> messages end up getting dequeued and send to the consumer. When you
>> ctrl+c and aren't used ack mode, these messages are effectively lost.
>> The solution is to use ack mode, or to use MQ.pop so you explicitly
>> ask for a message to be dequeued.
>> 1.5 also contains some new flow control features specifically for this
>> problem. They should be trivial to add to the library as soon as I
>> figure out how to use them.
>> Aman
>> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
>> > i have a problem using this library. just using 2 simple ruby
>> > processes:
>> > producer:
>> > require 'rubygems'
>> > require 'mq'
>> > EM.run do
>> > q = MQ.new
>> > 1000.times do
>> > q.queue('test').publish('foo')
>> > end
>> > end
>> > consumer:
>> > require 'rubygems'
>> > require 'mq'
>> > EM.run do
>> > q = MQ.new
>> > q.queue('test').subscribe do |msg|
>> > puts msg
>> > sleep 1
>> > end
>> > end
>> > i see that as soon as the consumer starts, it takes all 1000 messages
>> > off the queue. if i ctrl-c out of that process, the messages are gone
>> > from rabbit. if i try to use an ack version:
>> > require 'rubygems'
>> > require 'mq'
>> > EM.run do
>> > q = MQ.new
>> > q.queue('test').subscribe(:ack => true) do |header, msg|
>> > puts msg
>> > header.ack
>> > sleep 1
>> > end
>> > end
>> > the messages never get acked (after a ctrl-c) and stay on the queue
>> > between restarts. i'm using amqp-0.6 and rabbit 1.5.0.
the sleep(1) in my example was just representing processing on our
production machines. so if instead of sleep, i fetch messages from
memcached, fetch records from db, and send email, the result will be
the same: no ack sent. is there a way to have ack block?
On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
> You're using sleep(1) in your ack example, which blocks the reactor
> and doesn't give the acks a chance to be sent over the network. Here's
> an example using pop:
> queue = MQ.new.queue('name')
> queue.pop{ |msg|
> # if msg is nil, the queue was empty
> if msg
> process(msg)
> end
> # wait two seconds and pop another message off
> EM.add_timer(2){ queue.pop }
> }
> Aman
> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
> > try running the ack mode. if you ctrl+c after a couple of lines of
> > 'foo', you'll still see 1000 messages on the queue. is that because
> > the ack isn't synchronous? if you let the program run all the way to
> > then end, then the queue will be down to 0.
> > i was having trouble with pop(). how would you rewrite the consumer to
> > pop all 1000 messages?
> > On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >> The ack version works as expected, correct?
> >> When you subscribe, rabbit start sending you messages off of the queue
> >> until the TCP buffer fills up.. since this buffer is pretty big
> >> (especially compared to the size of each message), most of the
> >> messages end up getting dequeued and send to the consumer. When you
> >> ctrl+c and aren't used ack mode, these messages are effectively lost.
> >> The solution is to use ack mode, or to use MQ.pop so you explicitly
> >> ask for a message to be dequeued.
> >> 1.5 also contains some new flow control features specifically for this
> >> problem. They should be trivial to add to the library as soon as I
> >> figure out how to use them.
> >> Aman
> >> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
> >> > i have a problem using this library. just using 2 simple ruby
> >> > processes:
> >> > producer:
> >> > require 'rubygems'
> >> > require 'mq'
> >> > EM.run do
> >> > q = MQ.new
> >> > 1000.times do
> >> > q.queue('test').publish('foo')
> >> > end
> >> > end
> >> > consumer:
> >> > require 'rubygems'
> >> > require 'mq'
> >> > EM.run do
> >> > q = MQ.new
> >> > q.queue('test').subscribe do |msg|
> >> > puts msg
> >> > sleep 1
> >> > end
> >> > end
> >> > i see that as soon as the consumer starts, it takes all 1000 messages
> >> > off the queue. if i ctrl-c out of that process, the messages are gone
> >> > from rabbit. if i try to use an ack version:
one is to use asynchronous libraries for everything such that there
are never any blocking actions. you can use em-mysql
(http://github.com/tmm1/em-mysql) to access a mysql database,
eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
access and EM::P::SmtpClient to send emails.
the other is use use ruby threads to run your blocking code, such that
the main reactor can still continue to run alongside instead of
getting blocked as well. you can do this using EM.defer:
On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com> wrote:
> thanks, i think pop will work better.
> the sleep(1) in my example was just representing processing on our
> production machines. so if instead of sleep, i fetch messages from
> memcached, fetch records from db, and send email, the result will be
> the same: no ack sent. is there a way to have ack block?
> On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
>> You're using sleep(1) in your ack example, which blocks the reactor
>> and doesn't give the acks a chance to be sent over the network. Here's
>> an example using pop:
>> queue = MQ.new.queue('name')
>> queue.pop{ |msg|
>> # if msg is nil, the queue was empty
>> if msg
>> process(msg)
>> end
>> # wait two seconds and pop another message off
>> EM.add_timer(2){ queue.pop }
>> }
>> Aman
>> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
>> > try running the ack mode. if you ctrl+c after a couple of lines of
>> > 'foo', you'll still see 1000 messages on the queue. is that because
>> > the ack isn't synchronous? if you let the program run all the way to
>> > then end, then the queue will be down to 0.
>> > i was having trouble with pop(). how would you rewrite the consumer to
>> > pop all 1000 messages?
>> > On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> >> The ack version works as expected, correct?
>> >> When you subscribe, rabbit start sending you messages off of the queue
>> >> until the TCP buffer fills up.. since this buffer is pretty big
>> >> (especially compared to the size of each message), most of the
>> >> messages end up getting dequeued and send to the consumer. When you
>> >> ctrl+c and aren't used ack mode, these messages are effectively lost.
>> >> The solution is to use ack mode, or to use MQ.pop so you explicitly
>> >> ask for a message to be dequeued.
>> >> 1.5 also contains some new flow control features specifically for this
>> >> problem. They should be trivial to add to the library as soon as I
>> >> figure out how to use them.
>> >> Aman
>> >> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
>> >> > i have a problem using this library. just using 2 simple ruby
>> >> > processes:
>> >> > producer:
>> >> > require 'rubygems'
>> >> > require 'mq'
>> >> > EM.run do
>> >> > q = MQ.new
>> >> > 1000.times do
>> >> > q.queue('test').publish('foo')
>> >> > end
>> >> > end
>> >> > consumer:
>> >> > require 'rubygems'
>> >> > require 'mq'
>> >> > EM.run do
>> >> > q = MQ.new
>> >> > q.queue('test').subscribe do |msg|
>> >> > puts msg
>> >> > sleep 1
>> >> > end
>> >> > end
>> >> > i see that as soon as the consumer starts, it takes all 1000 messages
>> >> > off the queue. if i ctrl-c out of that process, the messages are gone
>> >> > from rabbit. if i try to use an ack version:
i guess what i'm saying is that acks caught in tcp buffer create a
dangerous situation when your client dies (or is manually stopped).
the work is done but never acked. when the buffer consists of a 1000
acks, that's a lot of lost but completed work. i imagine that moving
to non-blocking libraries would minimize the delay, but it still seems
like you'll lose work. maybe i'm wrong. it's a difficult theory to
test without converting my entire application to use em.
i'm surprised more people don't run into this issue. i'm not doing
anything special with the client.
but thanks again for the advice. very helpful.
On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> one is to use asynchronous libraries for everything such that there
> are never any blocking actions. you can use em-mysql
> (http://github.com/tmm1/em-mysql) to access a mysql database,
> eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
> access and EM::P::SmtpClient to send emails.
> the other is use use ruby threads to run your blocking code, such that
> the main reactor can still continue to run alongside instead of
> getting blocked as well. you can do this using EM.defer:
> On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com> wrote:
> > thanks, i think pop will work better.
> > the sleep(1) in my example was just representing processing on our
> > production machines. so if instead of sleep, i fetch messages from
> > memcached, fetch records from db, and send email, the result will be
> > the same: no ack sent. is there a way to have ack block?
> > On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
> >> You're using sleep(1) in your ack example, which blocks the reactor
> >> and doesn't give the acks a chance to be sent over the network. Here's
> >> an example using pop:
> >> queue = MQ.new.queue('name')
> >> queue.pop{ |msg|
> >> # if msg is nil, the queue was empty
> >> if msg
> >> process(msg)
> >> end
> >> # wait two seconds and pop another message off
> >> EM.add_timer(2){ queue.pop }
> >> }
> >> Aman
> >> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
> >> > try running the ack mode. if you ctrl+c after a couple of lines of
> >> > 'foo', you'll still see 1000 messages on the queue. is that because
> >> > the ack isn't synchronous? if you let the program run all the way to
> >> > then end, then the queue will be down to 0.
> >> > i was having trouble with pop(). how would you rewrite the consumer to
> >> > pop all 1000 messages?
> >> > On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >> >> The ack version works as expected, correct?
> >> >> When you subscribe, rabbit start sending you messages off of the queue
> >> >> until the TCP buffer fills up.. since this buffer is pretty big
> >> >> (especially compared to the size of each message), most of the
> >> >> messages end up getting dequeued and send to the consumer. When you
> >> >> ctrl+c and aren't used ack mode, these messages are effectively lost.
> >> >> The solution is to use ack mode, or to use MQ.pop so you explicitly
> >> >> ask for a message to be dequeued.
> >> >> 1.5 also contains some new flow control features specifically for this
> >> >> problem. They should be trivial to add to the library as soon as I
> >> >> figure out how to use them.
> >> >> Aman
> >> >> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
> >> >> > i have a problem using this library. just using 2 simple ruby
> >> >> > processes:
> >> >> > i see that as soon as the consumer starts, it takes all 1000 messages
> >> >> > off the queue. if i ctrl-c out of that process, the messages are gone
> >> >> > from rabbit. if i try to use an ack version:
the ack is only lost if you cause the reactor to block. another way to
mitigate this is by doing your blocking action inside a EM.next_tick,
which will happen after the EM buffers have been written to the
network:
}
On Fri, Feb 13, 2009 at 12:57 PM, amos <famosea...@gmail.com> wrote:
> i guess what i'm saying is that acks caught in tcp buffer create a
> dangerous situation when your client dies (or is manually stopped).
> the work is done but never acked. when the buffer consists of a 1000
> acks, that's a lot of lost but completed work. i imagine that moving
> to non-blocking libraries would minimize the delay, but it still seems
> like you'll lose work. maybe i'm wrong. it's a difficult theory to
> test without converting my entire application to use em.
> i'm surprised more people don't run into this issue. i'm not doing
> anything special with the client.
> but thanks again for the advice. very helpful.
> On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> there are two solutions to this problem:
>> one is to use asynchronous libraries for everything such that there
>> are never any blocking actions. you can use em-mysql
>> (http://github.com/tmm1/em-mysql) to access a mysql database,
>> eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
>> access and EM::P::SmtpClient to send emails.
>> the other is use use ruby threads to run your blocking code, such that
>> the main reactor can still continue to run alongside instead of
>> getting blocked as well. you can do this using EM.defer:
>> On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com> wrote:
>> > thanks, i think pop will work better.
>> > the sleep(1) in my example was just representing processing on our
>> > production machines. so if instead of sleep, i fetch messages from
>> > memcached, fetch records from db, and send email, the result will be
>> > the same: no ack sent. is there a way to have ack block?
>> > On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
>> >> You're using sleep(1) in your ack example, which blocks the reactor
>> >> and doesn't give the acks a chance to be sent over the network. Here's
>> >> an example using pop:
>> >> queue = MQ.new.queue('name')
>> >> queue.pop{ |msg|
>> >> # if msg is nil, the queue was empty
>> >> if msg
>> >> process(msg)
>> >> end
>> >> # wait two seconds and pop another message off
>> >> EM.add_timer(2){ queue.pop }
>> >> }
>> >> Aman
>> >> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
>> >> > try running the ack mode. if you ctrl+c after a couple of lines of
>> >> > 'foo', you'll still see 1000 messages on the queue. is that because
>> >> > the ack isn't synchronous? if you let the program run all the way to
>> >> > then end, then the queue will be down to 0.
>> >> > i was having trouble with pop(). how would you rewrite the consumer to
>> >> > pop all 1000 messages?
>> >> > On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> >> >> The ack version works as expected, correct?
>> >> >> When you subscribe, rabbit start sending you messages off of the queue
>> >> >> until the TCP buffer fills up.. since this buffer is pretty big
>> >> >> (especially compared to the size of each message), most of the
>> >> >> messages end up getting dequeued and send to the consumer. When you
>> >> >> ctrl+c and aren't used ack mode, these messages are effectively lost.
>> >> >> The solution is to use ack mode, or to use MQ.pop so you explicitly
>> >> >> ask for a message to be dequeued.
>> >> >> 1.5 also contains some new flow control features specifically for this
>> >> >> problem. They should be trivial to add to the library as soon as I
>> >> >> figure out how to use them.
>> >> >> Aman
>> >> >> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
>> >> >> > i have a problem using this library. just using 2 simple ruby
>> >> >> > processes:
>> >> >> > i see that as soon as the consumer starts, it takes all 1000 messages
>> >> >> > off the queue. if i ctrl-c out of that process, the messages are gone
>> >> >> > from rabbit. if i try to use an ack version:
pop seems funky (rabbit crashed a couple of times) so i'm going back
to this method. unfortunately, i still haven't figured out how to
flush the ack buffer. i end up processing more messages than the
server recognizes. is there some stop command that will ensure that
all acks are sent to the server before the client quits? EM.stop and
AMQP.stop still leave messages un-acked.
On Feb 13, 1:04 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> the ack is only lost if you cause the reactor to block. another way to
> mitigate this is by doing your blocking action inside a EM.next_tick,
> which will happen after the EM buffers have been written to the
> network:
> }
> On Fri, Feb 13, 2009 at 12:57 PM, amos <famosea...@gmail.com> wrote:
> > i guess what i'm saying is that acks caught in tcp buffer create a
> > dangerous situation when your client dies (or is manually stopped).
> > the work is done but never acked. when the buffer consists of a 1000
> > acks, that's a lot of lost but completed work. i imagine that moving
> > to non-blocking libraries would minimize the delay, but it still seems
> > like you'll lose work. maybe i'm wrong. it's a difficult theory to
> > test without converting my entire application to use em.
> > i'm surprised more people don't run into this issue. i'm not doing
> > anything special with the client.
> > but thanks again for the advice. very helpful.
> > On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >> there are two solutions to this problem:
> >> one is to use asynchronous libraries for everything such that there
> >> are never any blocking actions. you can use em-mysql
> >> (http://github.com/tmm1/em-mysql) to access a mysql database,
> >> eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
> >> access and EM::P::SmtpClient to send emails.
> >> the other is use use ruby threads to run your blocking code, such that
> >> the main reactor can still continue to run alongside instead of
> >> getting blocked as well. you can do this using EM.defer:
> >> On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com> wrote:
> >> > thanks, i think pop will work better.
> >> > the sleep(1) in my example was just representing processing on our
> >> > production machines. so if instead of sleep, i fetch messages from
> >> > memcached, fetch records from db, and send email, the result will be
> >> > the same: no ack sent. is there a way to have ack block?
> >> > On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
> >> >> You're using sleep(1) in your ack example, which blocks the reactor
> >> >> and doesn't give the acks a chance to be sent over the network. Here's
> >> >> an example using pop:
> >> >> queue = MQ.new.queue('name')
> >> >> queue.pop{ |msg|
> >> >> # if msg is nil, the queue was empty
> >> >> if msg
> >> >> process(msg)
> >> >> end
> >> >> # wait two seconds and pop another message off
> >> >> EM.add_timer(2){ queue.pop }
> >> >> }
> >> >> Aman
> >> >> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
> >> >> > try running the ack mode. if you ctrl+c after a couple of lines of
> >> >> > 'foo', you'll still see 1000 messages on the queue. is that because
> >> >> > the ack isn't synchronous? if you let the program run all the way to
> >> >> > then end, then the queue will be down to 0.
> >> >> > i was having trouble with pop(). how would you rewrite the consumer to
> >> >> > pop all 1000 messages?
> >> >> > On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >> >> >> The ack version works as expected, correct?
> >> >> >> When you subscribe, rabbit start sending you messages off of the queue
> >> >> >> until the TCP buffer fills up.. since this buffer is pretty big
> >> >> >> (especially compared to the size of each message), most of the
> >> >> >> messages end up getting dequeued and send to the consumer. When you
> >> >> >> ctrl+c and aren't used ack mode, these messages are effectively lost.
> >> >> >> The solution is to use ack mode, or to use MQ.pop so you explicitly
> >> >> >> ask for a message to be dequeued.
> >> >> >> 1.5 also contains some new flow control features specifically for this
> >> >> >> problem. They should be trivial to add to the library as soon as I
> >> >> >> figure out how to use them.
> >> >> >> Aman
> >> >> >> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
> >> >> >> > i have a problem using this library. just using 2 simple ruby
> >> >> >> > processes:
> >> >> >> > i see that as soon as the consumer starts, it takes all 1000 messages
> >> >> >> > off the queue. if i ctrl-c out of that process, the messages are gone
> >> >> >> > from rabbit. if i try to use an ack version:
On Wed, Mar 4, 2009 at 11:01 AM, amos <famosea...@gmail.com> wrote:
> pop seems funky (rabbit crashed a couple of times) so i'm going back
> to this method. unfortunately, i still haven't figured out how to
> flush the ack buffer. i end up processing more messages than the
> server recognizes. is there some stop command that will ensure that
> all acks are sent to the server before the client quits? EM.stop and
> AMQP.stop still leave messages un-acked.
> On Feb 13, 1:04 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> the ack is only lost if you cause the reactor to block. another way to
>> mitigate this is by doing your blocking action inside a EM.next_tick,
>> which will happen after the EM buffers have been written to the
>> network:
>> }
>> On Fri, Feb 13, 2009 at 12:57 PM, amos <famosea...@gmail.com> wrote:
>> > i guess what i'm saying is that acks caught in tcp buffer create a
>> > dangerous situation when your client dies (or is manually stopped).
>> > the work is done but never acked. when the buffer consists of a 1000
>> > acks, that's a lot of lost but completed work. i imagine that moving
>> > to non-blocking libraries would minimize the delay, but it still seems
>> > like you'll lose work. maybe i'm wrong. it's a difficult theory to
>> > test without converting my entire application to use em.
>> > i'm surprised more people don't run into this issue. i'm not doing
>> > anything special with the client.
>> > but thanks again for the advice. very helpful.
>> > On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> >> there are two solutions to this problem:
>> >> one is to use asynchronous libraries for everything such that there
>> >> are never any blocking actions. you can use em-mysql
>> >> (http://github.com/tmm1/em-mysql) to access a mysql database,
>> >> eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
>> >> access and EM::P::SmtpClient to send emails.
>> >> the other is use use ruby threads to run your blocking code, such that
>> >> the main reactor can still continue to run alongside instead of
>> >> getting blocked as well. you can do this using EM.defer:
>> >> On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com> wrote:
>> >> > thanks, i think pop will work better.
>> >> > the sleep(1) in my example was just representing processing on our
>> >> > production machines. so if instead of sleep, i fetch messages from
>> >> > memcached, fetch records from db, and send email, the result will be
>> >> > the same: no ack sent. is there a way to have ack block?
>> >> > On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
>> >> >> You're using sleep(1) in your ack example, which blocks the reactor
>> >> >> and doesn't give the acks a chance to be sent over the network. Here's
>> >> >> an example using pop:
>> >> >> queue = MQ.new.queue('name')
>> >> >> queue.pop{ |msg|
>> >> >> # if msg is nil, the queue was empty
>> >> >> if msg
>> >> >> process(msg)
>> >> >> end
>> >> >> # wait two seconds and pop another message off
>> >> >> EM.add_timer(2){ queue.pop }
>> >> >> }
>> >> >> Aman
>> >> >> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com> wrote:
>> >> >> > try running the ack mode. if you ctrl+c after a couple of lines of
>> >> >> > 'foo', you'll still see 1000 messages on the queue. is that because
>> >> >> > the ack isn't synchronous? if you let the program run all the way to
>> >> >> > then end, then the queue will be down to 0.
>> >> >> > i was having trouble with pop(). how would you rewrite the consumer to
>> >> >> > pop all 1000 messages?
>> >> >> > On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> >> >> >> The ack version works as expected, correct?
>> >> >> >> When you subscribe, rabbit start sending you messages off of the queue
>> >> >> >> until the TCP buffer fills up.. since this buffer is pretty big
>> >> >> >> (especially compared to the size of each message), most of the
>> >> >> >> messages end up getting dequeued and send to the consumer. When you
>> >> >> >> ctrl+c and aren't used ack mode, these messages are effectively lost.
>> >> >> >> The solution is to use ack mode, or to use MQ.pop so you explicitly
>> >> >> >> ask for a message to be dequeued.
>> >> >> >> 1.5 also contains some new flow control features specifically for this
>> >> >> >> problem. They should be trivial to add to the library as soon as I
>> >> >> >> figure out how to use them.
>> >> >> >> Aman
>> >> >> >> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com> wrote:
>> >> >> >> > i have a problem using this library. just using 2 simple ruby
>> >> >> >> > processes:
>> >> >> >> > i see that as soon as the consumer starts, it takes all 1000 messages
>> >> >> >> > off the queue. if i ctrl-c out of that process, the messages are gone
>> >> >> >> > from rabbit. if i try to use an ack version:
On 4 Mar 2009, at 19:01, amos <famosea...@gmail.com> wrote:
> pop seems funky (rabbit crashed a couple of times)
If Rabbit crashed for some reason, it would be interesting (from a
server perspective) to be able to reproduce this, because the client
should ordinarily not be able to crash the broker. Do you have any
server logs at all? Is it possible to provide a stripped down version
of your program that can reproduce a crash?
> so i'm going back
> to this method. unfortunately, i still haven't figured out how to
> flush the ack buffer. i end up processing more messages than the
> server recognizes. is there some stop command that will ensure that
> all acks are sent to the server before the client quits? EM.stop and
> AMQP.stop still leave messages un-acked.
> On Feb 13, 1:04 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>> the ack is only lost if you cause the reactor to block. another way
>> to
>> mitigate this is by doing your blocking action inside a
>> EM.next_tick,
>> which will happen after the EM buffers have been written to the
>> network:
>> }
>> On Fri, Feb 13, 2009 at 12:57 PM, amos <famosea...@gmail.com> wrote:
>>> i guess what i'm saying is that acks caught in tcp buffer create a
>>> dangerous situation when your client dies (or is manually stopped).
>>> the work is done but never acked. when the buffer consists of a 1000
>>> acks, that's a lot of lost but completed work. i imagine that moving
>>> to non-blocking libraries would minimize the delay, but it still
>>> seems
>>> like you'll lose work. maybe i'm wrong. it's a difficult theory to
>>> test without converting my entire application to use em.
>>> i'm surprised more people don't run into this issue. i'm not doing
>>> anything special with the client.
>>> but thanks again for the advice. very helpful.
>>> On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>>>> there are two solutions to this problem:
>>>> one is to use asynchronous libraries for everything such that there
>>>> are never any blocking actions. you can use em-mysql
>>>> (http://github.com/tmm1/em-mysql) to access a mysql database,
>>>> eventedcache (http://github.com/cliffmoon/eventedcache) for
>>>> memcache
>>>> access and EM::P::SmtpClient to send emails.
>>>> the other is use use ruby threads to run your blocking code, such
>>>> that
>>>> the main reactor can still continue to run alongside instead of
>>>> getting blocked as well. you can do this using EM.defer:
>>>> On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com>
>>>> wrote:
>>>>> thanks, i think pop will work better.
>>>>> the sleep(1) in my example was just representing processing on our
>>>>> production machines. so if instead of sleep, i fetch messages from
>>>>> memcached, fetch records from db, and send email, the result
>>>>> will be
>>>>> the same: no ack sent. is there a way to have ack block?
>>>>> On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
>>>>>> You're using sleep(1) in your ack example, which blocks the
>>>>>> reactor
>>>>>> and doesn't give the acks a chance to be sent over the network.
>>>>>> Here's
>>>>>> an example using pop:
>>>>>> queue = MQ.new.queue('name')
>>>>>> queue.pop{ |msg|
>>>>>> # if msg is nil, the queue was empty
>>>>>> if msg
>>>>>> process(msg)
>>>>>> end
>>>>>> # wait two seconds and pop another message off
>>>>>> EM.add_timer(2){ queue.pop }
>>>>>> }
>>>>>> Aman
>>>>>> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com>
>>>>>> wrote:
>>>>>>> try running the ack mode. if you ctrl+c after a couple of
>>>>>>> lines of
>>>>>>> 'foo', you'll still see 1000 messages on the queue. is that
>>>>>>> because
>>>>>>> the ack isn't synchronous? if you let the program run all the
>>>>>>> way to
>>>>>>> then end, then the queue will be down to 0.
>>>>>>> i was having trouble with pop(). how would you rewrite the
>>>>>>> consumer to
>>>>>>> pop all 1000 messages?
>>>>>>> On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
>>>>>>>> The ack version works as expected, correct?
>>>>>>>> When you subscribe, rabbit start sending you messages off of
>>>>>>>> the queue
>>>>>>>> until the TCP buffer fills up.. since this buffer is pretty big
>>>>>>>> (especially compared to the size of each message), most of the
>>>>>>>> messages end up getting dequeued and send to the consumer.
>>>>>>>> When you
>>>>>>>> ctrl+c and aren't used ack mode, these messages are
>>>>>>>> effectively lost.
>>>>>>>> The solution is to use ack mode, or to use MQ.pop so you
>>>>>>>> explicitly
>>>>>>>> ask for a message to be dequeued.
>>>>>>>> 1.5 also contains some new flow control features specifically
>>>>>>>> for this
>>>>>>>> problem. They should be trivial to add to the library as soon
>>>>>>>> as I
>>>>>>>> figure out how to use them.
>>>>>>>> Aman
>>>>>>>> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>> i have a problem using this library. just using 2 simple ruby
>>>>>>>>> processes:
>>>>>>>>> EM.run do
>>>>>>>>> q = MQ.new
>>>>>>>>> q.queue('test').subscribe do |msg|
>>>>>>>>> puts msg
>>>>>>>>> sleep 1
>>>>>>>>> end
>>>>>>>>> end
>>>>>>>>> i see that as soon as the consumer starts, it takes all 1000
>>>>>>>>> messages
>>>>>>>>> off the queue. if i ctrl-c out of that process, the messages
>>>>>>>>> are gone
>>>>>>>>> from rabbit. if i try to use an ack version:
>>>>>>>>> EM.run do
>>>>>>>>> q = MQ.new
>>>>>>>>> q.queue('test').subscribe(:ack => true) do |header, msg|
>>>>>>>>> puts msg
>>>>>>>>> header.ack
>>>>>>>>> sleep 1
>>>>>>>>> end
>>>>>>>>> end
>>>>>>>>> the messages never get acked (after a ctrl-c) and stay on
>>>>>>>>> the queue
>>>>>>>>> between restarts. i'm using amqp-0.6 and rabbit 1.5.0.
it happened 2x when the client machines were under heavy load (2x
number of cores). i'm speaking with alexis richardson about analyzing
the log files. i'll let you know what we discover. simple client code
would be something like:
queue = mq.queue(queue_name, :durable => true)
queue.pop(:ack => true) do |info, task|
if task
task = Marshal.load(task)
process(task)
info.ack
end
EM.add_timer(0.001){ queue.pop }
end
On Mar 5, 1:40 am, Ben Hood <0x6e6...@gmail.com> wrote:
> On 4 Mar 2009, at 19:01, amos <famosea...@gmail.com> wrote:
> > pop seems funky (rabbit crashed a couple of times)
> If Rabbit crashed for some reason, it would be interesting (from a
> server perspective) to be able to reproduce this, because the client
> should ordinarily not be able to crash the broker. Do you have any
> server logs at all? Is it possible to provide a stripped down version
> of your program that can reproduce a crash?
> Ben
> > so i'm going back
> > to this method. unfortunately, i still haven't figured out how to
> > flush the ack buffer. i end up processing more messages than the
> > server recognizes. is there some stop command that will ensure that
> > all acks are sent to the server before the client quits? EM.stop and
> > AMQP.stop still leave messages un-acked.
> > On Feb 13, 1:04 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >> the ack is only lost if you cause the reactor to block. another way
> >> to
> >> mitigate this is by doing your blocking action inside a
> >> EM.next_tick,
> >> which will happen after the EM buffers have been written to the
> >> network:
> >> }
> >> On Fri, Feb 13, 2009 at 12:57 PM, amos <famosea...@gmail.com> wrote:
> >>> i guess what i'm saying is that acks caught in tcp buffer create a
> >>> dangerous situation when your client dies (or is manually stopped).
> >>> the work is done but never acked. when the buffer consists of a 1000
> >>> acks, that's a lot of lost but completed work. i imagine that moving
> >>> to non-blocking libraries would minimize the delay, but it still
> >>> seems
> >>> like you'll lose work. maybe i'm wrong. it's a difficult theory to
> >>> test without converting my entire application to use em.
> >>> i'm surprised more people don't run into this issue. i'm not doing
> >>> anything special with the client.
> >>> but thanks again for the advice. very helpful.
> >>> On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >>>> there are two solutions to this problem:
> >>>> one is to use asynchronous libraries for everything such that there
> >>>> are never any blocking actions. you can use em-mysql
> >>>> (http://github.com/tmm1/em-mysql) to access a mysql database,
> >>>> eventedcache (http://github.com/cliffmoon/eventedcache) for
> >>>> memcache
> >>>> access and EM::P::SmtpClient to send emails.
> >>>> the other is use use ruby threads to run your blocking code, such
> >>>> that
> >>>> the main reactor can still continue to run alongside instead of
> >>>> getting blocked as well. you can do this using EM.defer:
> >>>> On Thu, Feb 12, 2009 at 11:52 AM, amos <famosea...@gmail.com>
> >>>> wrote:
> >>>>> thanks, i think pop will work better.
> >>>>> the sleep(1) in my example was just representing processing on our
> >>>>> production machines. so if instead of sleep, i fetch messages from
> >>>>> memcached, fetch records from db, and send email, the result
> >>>>> will be
> >>>>> the same: no ack sent. is there a way to have ack block?
> >>>>> On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:
> >>>>>> You're using sleep(1) in your ack example, which blocks the
> >>>>>> reactor
> >>>>>> and doesn't give the acks a chance to be sent over the network.
> >>>>>> Here's
> >>>>>> an example using pop:
> >>>>>> queue = MQ.new.queue('name')
> >>>>>> queue.pop{ |msg|
> >>>>>> # if msg is nil, the queue was empty
> >>>>>> if msg
> >>>>>> process(msg)
> >>>>>> end
> >>>>>> # wait two seconds and pop another message off
> >>>>>> EM.add_timer(2){ queue.pop }
> >>>>>> }
> >>>>>> Aman
> >>>>>> On Thu, Feb 12, 2009 at 11:07 AM, amos <famosea...@gmail.com>
> >>>>>> wrote:
> >>>>>>> try running the ack mode. if you ctrl+c after a couple of
> >>>>>>> lines of
> >>>>>>> 'foo', you'll still see 1000 messages on the queue. is that
> >>>>>>> because
> >>>>>>> the ack isn't synchronous? if you let the program run all the
> >>>>>>> way to
> >>>>>>> then end, then the queue will be down to 0.
> >>>>>>> i was having trouble with pop(). how would you rewrite the
> >>>>>>> consumer to
> >>>>>>> pop all 1000 messages?
> >>>>>>> On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:
> >>>>>>>> The ack version works as expected, correct?
> >>>>>>>> When you subscribe, rabbit start sending you messages off of
> >>>>>>>> the queue
> >>>>>>>> until the TCP buffer fills up.. since this buffer is pretty big
> >>>>>>>> (especially compared to the size of each message), most of the
> >>>>>>>> messages end up getting dequeued and send to the consumer.
> >>>>>>>> When you
> >>>>>>>> ctrl+c and aren't used ack mode, these messages are
> >>>>>>>> effectively lost.
> >>>>>>>> The solution is to use ack mode, or to use MQ.pop so you
> >>>>>>>> explicitly
> >>>>>>>> ask for a message to be dequeued.
> >>>>>>>> 1.5 also contains some new flow control features specifically
> >>>>>>>> for this
> >>>>>>>> problem. They should be trivial to add to the library as soon
> >>>>>>>> as I
> >>>>>>>> figure out how to use them.
> >>>>>>>> Aman
> >>>>>>>> On Wed, Feb 11, 2009 at 7:20 PM, amos <famosea...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>> i have a problem using this library. just using 2 simple ruby
> >>>>>>>>> processes:
> >>>>>>>>> EM.run do
> >>>>>>>>> q = MQ.new
> >>>>>>>>> q.queue('test').subscribe do |msg|
> >>>>>>>>> puts msg
> >>>>>>>>> sleep 1
> >>>>>>>>> end
> >>>>>>>>> end
> >>>>>>>>> i see that as soon as the consumer starts, it takes all 1000
> >>>>>>>>> messages
> >>>>>>>>> off the queue. if i ctrl-c out of that process, the messages
> >>>>>>>>> are gone
> >>>>>>>>> from rabbit. if i try to use an ack version:
> >>>>>>>>> EM.run do
> >>>>>>>>> q = MQ.new
> >>>>>>>>> q.queue('test').subscribe(:ack => true) do |header, msg|
> >>>>>>>>> puts msg
> >>>>>>>>> header.ack
> >>>>>>>>> sleep 1
> >>>>>>>>> end
> >>>>>>>>> end
> >>>>>>>>> the messages never get acked (after a ctrl-c) and stay on
> >>>>>>>>> the queue
> >>>>>>>>> between restarts. i'm using amqp-0.6 and rabbit 1.5.0.