Gmail Calendar Documents Reader Web more »
Recently Visited Groups | Help | Sign in
Google Groups Home
subscribe gobbles messages
There are currently too many topics in this group that display first. To make this topic appear first, remove this option from another topic.
There was an error processing your request. Please try again.
flag
  14 messages - Collapse all  -  Translate all to Translated (View all originals)
The group you are posting to is a Usenet group. Messages posted to this group will make your email address visible to anyone on the Internet.
Your reply message has not been sent.
Your post was successful
 
From:
To:
Cc:
Followup To:
Add Cc | Add Followup-to | Edit Subject
Subject:
Validation:
For verification purposes please type the characters you see in the picture below or the numbers you hear by clicking the accessibility icon. Listen and type the numbers you hear
 
amos  
View profile  
 More options Feb 11, 10:20 pm
From: amos <famosea...@gmail.com>
Date: Wed, 11 Feb 2009 19:20:24 -0800 (PST)
Local: Wed, Feb 11 2009 10:20 pm
Subject: subscribe gobbles messages
i have a problem using this library. just using 2 simple ruby
processes:

producer:

require 'rubygems'
require 'mq'

EM.run do
 q = MQ.new
 1000.times do
   q.queue('test').publish('foo')
 end
end

consumer:

require 'rubygems'
require 'mq'

EM.run do
 q = MQ.new
 q.queue('test').subscribe do |msg|
   puts msg
   sleep 1
 end
end

i see that as soon as the consumer starts, it takes all 1000 messages
off the queue. if i ctrl-c out of that process, the messages are gone
from rabbit. if i try to use an ack version:

require 'rubygems'
require 'mq'

EM.run do
 q = MQ.new
 q.queue('test').subscribe(:ack => true) do |header, msg|
   puts msg
   header.ack
   sleep 1
 end
end

the messages never get acked (after a ctrl-c) and stay on the queue
between restarts. i'm using amqp-0.6 and rabbit 1.5.0.


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Aman Gupta  
View profile  
 More options Feb 11, 10:32 pm
From: Aman Gupta <themastermi...@gmail.com>
Date: Wed, 11 Feb 2009 19:32:02 -0800
Local: Wed, Feb 11 2009 10:32 pm
Subject: Re: [amqp] subscribe gobbles messages
The ack version works as expected, correct?

When you subscribe, rabbit start sending you messages off of the queue
until the TCP buffer fills up.. since this buffer is pretty big
(especially compared to the size of each message), most of the
messages end up getting dequeued and send to the consumer. When you
ctrl+c and aren't used ack mode, these messages are effectively lost.

The solution is to use ack mode, or to use MQ.pop so you explicitly
ask for a message to be dequeued.

1.5 also contains some new flow control features specifically for this
problem. They should be trivial to add to the library as soon as I
figure out how to use them.

  Aman


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Ben Hood  
View profile  
 More options Feb 12, 3:31 am
From: Ben Hood <0x6e6...@gmail.com>
Date: Thu, 12 Feb 2009 08:31:33 +0000
Local: Thurs, Feb 12 2009 3:31 am
Subject: Re: [amqp] Re: subscribe gobbles messages
Aman,

On Thu, Feb 12, 2009 at 3:32 AM, Aman Gupta <themastermi...@gmail.com> wrote:
> 1.5 also contains some new flow control features specifically for this
> problem. They should be trivial to add to the library as soon as I
> figure out how to use them.

You might be interested in the basic.qos command that allows a
consumer to set it's own prefetch window size.

Ben


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Aman Gupta  
View profile  
 More options Feb 12, 3:35 am
From: Aman Gupta <themastermi...@gmail.com>
Date: Thu, 12 Feb 2009 00:35:28 -0800
Local: Thurs, Feb 12 2009 3:35 am
Subject: Re: [amqp] Re: subscribe gobbles messages
How about this api:

# open channel
mq = MQ.new

# prefetch up to 10 messages
mq.prefetch(10)

# subscribe to a queue
mq.queue('name').subscribe{ |msg|
  puts msg

}

  Aman


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
amos  
View profile  
 More options Feb 12, 2:07 pm
From: amos <famosea...@gmail.com>
Date: Thu, 12 Feb 2009 11:07:07 -0800 (PST)
Local: Thurs, Feb 12 2009 2:07 pm
Subject: Re: subscribe gobbles messages
try running the ack mode. if you ctrl+c after a couple of lines of
'foo', you'll still see 1000 messages on the queue. is that because
the ack isn't synchronous? if you let the program run all the way to
then end, then the queue will be down to 0.

i was having trouble with pop(). how would you rewrite the consumer to
pop all 1000 messages?

On Feb 11, 7:32 pm, Aman Gupta <themastermi...@gmail.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Aman Gupta  
View profile  
 More options Feb 12, 2:38 pm
From: Aman Gupta <themastermi...@gmail.com>
Date: Thu, 12 Feb 2009 11:38:25 -0800
Local: Thurs, Feb 12 2009 2:38 pm
Subject: Re: [amqp] Re: subscribe gobbles messages
You're using sleep(1) in your ack example, which blocks the reactor
and doesn't give the acks a chance to be sent over the network. Here's
an example using pop:

queue = MQ.new.queue('name')
queue.pop{ |msg|
  # if msg is nil, the queue was empty
  if msg
    process(msg)
  end

  # wait two seconds and pop another message off
  EM.add_timer(2){ queue.pop }

}

  Aman


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
amos  
View profile  
 More options Feb 12, 2:52 pm
From: amos <famosea...@gmail.com>
Date: Thu, 12 Feb 2009 11:52:18 -0800 (PST)
Local: Thurs, Feb 12 2009 2:52 pm
Subject: Re: subscribe gobbles messages
thanks, i think pop will work better.

the sleep(1) in my example was just representing processing on our
production machines. so if instead of sleep, i fetch messages from
memcached, fetch records from db, and send email, the result will be
the same: no ack sent. is there a way to have ack block?

On Feb 12, 11:38 am, Aman Gupta <themastermi...@gmail.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Aman Gupta  
View profile  
 More options Feb 12, 3:31 pm
From: Aman Gupta <themastermi...@gmail.com>
Date: Thu, 12 Feb 2009 12:31:54 -0800
Local: Thurs, Feb 12 2009 3:31 pm
Subject: Re: [amqp] Re: subscribe gobbles messages
there are two solutions to this problem:

one is to use asynchronous libraries for everything such that there
are never any blocking actions. you can use em-mysql
(http://github.com/tmm1/em-mysql) to access a mysql database,
eventedcache (http://github.com/cliffmoon/eventedcache) for memcache
access and EM::P::SmtpClient to send emails.

the other is use use ruby threads to run your blocking code, such that
the main reactor can still continue to run alongside instead of
getting blocked as well. you can do this using EM.defer:

MQ.queue('name').subscribe{ |info, msg|
  EM.defer(proc{
    process(msg)
  }, proc{
    info.ack
  })

}

  Aman


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
amos  
View profile  
 More options Feb 13, 3:57 pm
From: amos <famosea...@gmail.com>
Date: Fri, 13 Feb 2009 12:57:24 -0800 (PST)
Local: Fri, Feb 13 2009 3:57 pm
Subject: Re: subscribe gobbles messages
i guess what i'm saying is that acks caught in tcp buffer create a
dangerous situation when your client dies (or is manually stopped).
the work is done but never acked. when the buffer consists of a 1000
acks, that's a lot of lost but completed work. i imagine that moving
to non-blocking libraries would minimize the delay, but it still seems
like you'll lose work. maybe i'm wrong. it's a difficult theory to
test without converting my entire application to use em.

i'm surprised more people don't run into this issue. i'm not doing
anything special with the client.

but thanks again for the advice. very helpful.

On Feb 12, 12:31 pm, Aman Gupta <themastermi...@gmail.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Aman Gupta  
View profile  
 More options Feb 13, 4:04 pm
From: Aman Gupta <themastermi...@gmail.com>
Date: Fri, 13 Feb 2009 13:04:19 -0800
Local: Fri, Feb 13 2009 4:04 pm
Subject: Re: [amqp] Re: subscribe gobbles messages
the ack is only lost if you cause the reactor to block. another way to
mitigate this is by doing your blocking action inside a EM.next_tick,
which will happen after the EM buffers have been written to the
network:

MQ.queue('name').subscribe{ |info, msg|
  info.ack
  EM.next_tick{
    process(msg)
  }


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
amos  
View profile  
 More options Mar 4, 2:01 pm
From: amos <famosea...@gmail.com>
Date: Wed, 4 Mar 2009 11:01:34 -0800 (PST)
Local: Wed, Mar 4 2009 2:01 pm
Subject: Re: subscribe gobbles messages
pop seems funky (rabbit crashed a couple of times) so i'm going back
to this method. unfortunately, i still haven't figured out how to
flush the ack buffer. i end up processing more messages than the
server recognizes. is there some stop command that will ensure that
all acks are sent to the server before the client quits? EM.stop and
AMQP.stop still leave messages un-acked.

On Feb 13, 1:04 pm, Aman Gupta <themastermi...@gmail.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Aman Gupta  
View profile  
 More options Mar 4, 2:09 pm
From: Aman Gupta <themastermi...@gmail.com>
Date: Wed, 4 Mar 2009 11:09:35 -0800
Local: Wed, Mar 4 2009 2:09 pm
Subject: Re: [amqp] Re: subscribe gobbles messages
AMQP.stop should not leave the buffer unacked. Try setting a prefetch window:

  MQ.send(AMQP::Protocol::Basic::Qos.new(:prefetch_count=>1, :global => true))

  Aman


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
Ben Hood  
View profile  
 More options Mar 5, 4:40 am
From: Ben Hood <0x6e6...@gmail.com>
Date: Thu, 5 Mar 2009 09:40:18 +0000
Local: Thurs, Mar 5 2009 4:40 am
Subject: Re: [amqp] Re: subscribe gobbles messages
Amos,

On 4 Mar 2009, at 19:01, amos <famosea...@gmail.com> wrote:

> pop seems funky (rabbit crashed a couple of times)

If Rabbit crashed for some reason, it would be interesting (from a  
server perspective) to be able to reproduce this, because the client  
should ordinarily not be able to crash the broker. Do you have any  
server logs at all? Is it possible to provide a stripped down version  
of your program that can reproduce a crash?

Ben


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
amos  
View profile  
 More options Mar 5, 11:27 am
From: amos <famosea...@gmail.com>
Date: Thu, 5 Mar 2009 08:27:14 -0800 (PST)
Local: Thurs, Mar 5 2009 11:27 am
Subject: Re: subscribe gobbles messages
it happened 2x when the client machines were under heavy load (2x
number of cores). i'm speaking with alexis richardson about analyzing
the log files. i'll let you know what we discover. simple client code
would be something like:

queue = mq.queue(queue_name, :durable => true)
        queue.pop(:ack => true) do |info, task|
          if task
            task = Marshal.load(task)
            process(task)
            info.ack
          end
          EM.add_timer(0.001){ queue.pop }
        end

On Mar 5, 1:40 am, Ben Hood <0x6e6...@gmail.com> wrote:


    Reply to author    Forward  
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.
End of messages
« Back to Discussions « Newer topic     Older topic »

Create a group - Google Groups - Google Home - Terms of Service - Privacy Policy
©2009 Google