amqp 0.7.0

13 views
Skip to first unread message

Aman Gupta

unread,
Dec 3, 2009, 7:12:58 PM12/3/09
to ruby...@googlegroups.com
I plan on releasing a new gem in the next few days. It's been a while
since the last release, and I know a lot of you have forked the
project and submitted patches for various bugs and new features. I
want to make sure all this stuff gets merged before this release, so
if you have a known issue, bug, patch or feature request please reply
to this thread.

Aman

Ezra Zygmuntowicz

unread,
Dec 3, 2009, 7:17:47 PM12/3/09
to ruby...@googlegroups.com
Here are my two monkey patches to amqp that I'd like to see get
rolled in:

class MQ
class Queue
# Asks the broker to redeliver all unacknowledged messages on a
# specifieid channel. Zero or more messages may be redelivered.
#
# * requeue (default false)
# If this parameter is false, the message will be redelivered to
the original recipient.
# If this flag is true, the server will attempt to requeue the
message, potentially then
# delivering it to an alternative subscriber.
#
def recover requeue = false
@mq.callback{
@mq.send Protocol::Basic::Recover.new({ :requeue => requeue })
}
self
end
end
end

# monkey patch to the amqp gem that adds :no_declare => true option
for new
# Exchange objects. This allows us to send messeages to exchanges that
are
# declared by the mappers and that we have no configuration
priviledges on.
# temporary until we get this into amqp proper
MQ::Exchange.class_eval do
def initialize mq, type, name, opts = {}
@mq = mq
@type, @name, @opts = type, name, opts
@mq.exchanges[@name = name] ||= self
@key = opts[:key]

@mq.callback{
@mq.send AMQP::Protocol::Exchange::Declare.new({ :exchange =>
name,
:type => type,
:nowait =>
true }.merge(opts))
} unless name == "amq.#{type}" or name == '' or opts[:no_declare]
end
end



Thanks
Ezra Zygmuntowicz
e...@engineyard.com



Jordan Sissel

unread,
Dec 3, 2009, 7:21:34 PM12/3/09
to ruby...@googlegroups.com
monkeypatch to log when we are reconnecting, and also delay
reconnection to not swamp things.

# http://github.com/tmm1/amqp/issues/#issue/3
# This is our (lame) hack to at least notify the user that something is
# wrong.
module AMQP
module Client
alias :original_reconnect :reconnect
def reconnect(*args)
$logger.warn "reconnecting to broker (bad MQ settings?)"

# some rate limiting
sleep(5)

original_reconnect(*args)
end
end
end

Ideally, being able to register a handler for the reconnect event
would be nice. I can make a patch for this if you want.

-Jordan

> --
>
> You received this message because you are subscribed to the Google Groups "AMQP" group.
> To post to this group, send email to ruby...@googlegroups.com.
> To unsubscribe from this group, send email to ruby-amqp+...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/ruby-amqp?hl=en.
>
>
>

Aman Gupta

unread,
Dec 3, 2009, 7:26:59 PM12/3/09
to AMQP


On Dec 3, 7:21 pm, Jordan Sissel <j...@semicomplete.com> wrote:
> On Thu, Dec 3, 2009 at 4:12 PM, Aman Gupta <themastermi...@gmail.com> wrote:
> > I plan on releasing a new gem in the next few days. It's been a while
> > since the last release, and I know a lot of you have forked the
> > project and submitted patches for various bugs and new features. I
> > want to make sure all this stuff gets merged before this release, so
> > if you have a known issue, bug, patch or feature request please reply
> > to this thread.
>
> >  Aman
>
> monkeypatch to log when we are reconnecting, and also delay
> reconnection to not swamp things.
>
> #http://github.com/tmm1/amqp/issues/#issue/3
> # This is our (lame) hack to at least notify the user that something is
> # wrong.
> module AMQP
>   module Client
>     alias :original_reconnect :reconnect
>     def reconnect(*args)
>       $logger.warn "reconnecting to broker (bad MQ settings?)"
>
>       # some rate limiting
>       sleep(5)
>
>       original_reconnect(*args)
>     end
>   end
> end

sleep is a bad idea in EM land, as it blocks everything. The better
solution is to use EM.add_timer(5){ original_reconnect(*args) }. I'll
update the reconnect logic to make it more sane.

>
> Ideally, being able to register a handler for the reconnect event
> would be nice. I can make a patch for this if you want.

When would you want to get an event? You can currently do:

conn.connection_status{ |s|
s == :connected || s == :disconnected
}

where conn is either AMQP.conn for the global connection, or the
connection object returned from AMQP.connect if you connected
manually.

Aman

Jordan Sissel

unread,
Dec 3, 2009, 7:38:09 PM12/3/09
to ruby...@googlegroups.com
Yeah, in our case, the only thing using EM is AMQP, so we didn't care
if the sleep blocked things. You are right about the timer being a
better solution.

>>
>> Ideally, being able to register a handler for the reconnect event
>> would be nice. I can make a patch for this if you want.
>
> When would you want to get an event? You can currently do:
>
> conn.connection_status{ |s|
>  s == :connected || s == :disconnected
> }

For now, I only need it for logging the event (so we can correlate
with other failures).

-Jordan

Aman Gupta

unread,
Dec 3, 2009, 7:39:15 PM12/3/09
to ruby...@googlegroups.com
Are the separate disconnected and connected events via
#connection_status good enough, or would you like something in
addition.

Aman

Aman Gupta

unread,
Dec 3, 2009, 7:41:37 PM12/3/09
to AMQP
Here's the list of issues I know about:

- MQ::Queue#unsubscribe doesn't wait until confirmation before
removing the subscribe block
- immediate publishing causes the client to explode when a message is returned
- reconnect when subscribed to queues with routing keys is broken
- global Thread.current[:mq] is not cleaned up properly on
disconnect in all cases
- better reconnect interval (exponential fallback?)
- missing support for MQ::Queue#purge
- missing support for MQ::Queue#recover
- missing support for :no_declare=>true on MQ::Exchange

Other things I would like to do before the release:

- better documentation about prefetch and a getting started section
in the README that emphasizes prefetch+subscribe with ack
- raise errors from MQ::Queue#publish when not connected to a server
- better cluster support with support for a list of hosts to connect
to (and support for forwarding responses from a server)
- allow mq.queue(nil) to get a server-assigned temporary queue name
(instead of using rand() hacks locally)

And somewhere down the road, I would love to:

- merge a sync api from either carrot or bunny
- redo all the docs to be actual documentation instead of amqp spec
documentation

Aman

Matt Todd

unread,
Dec 3, 2009, 7:47:01 PM12/3/09
to ruby...@googlegroups.com
I've seen times when the reconnects would basically hammer RabbitMQ so hard it took 100% CPU; the problem was an authorization error but AMQP just tried eternally.

This reconnect interval should help quite a bit. I especially would like the exponential (or linear, at least) intervals.

I've also noticed that there's no way to throttle subscription callbacks. I don't know if this is something exactly necessary, but it could be nice to be able to say "I only want to process 2,500 messages per second" or something similar.

No patches here, though, sorry :(

Matt
Matt Todd
Highgroove Studios
www.highgroove.com
cell: 404-314-2612
blog: maraby.org

Scout - Web Monitoring and Reporting Software
www.scoutapp.com

Aman Gupta

unread,
Dec 3, 2009, 7:50:41 PM12/3/09
to ruby...@googlegroups.com
On Thu, Dec 3, 2009 at 7:47 PM, Matt Todd <chio...@gmail.com> wrote:
> I've seen times when the reconnects would basically hammer RabbitMQ so hard
> it took 100% CPU; the problem was an authorization error but AMQP just tried
> eternally.
> This reconnect interval should help quite a bit. I especially would like the
> exponential (or linear, at least) intervals.

I wonder if it should eventually give up? In the case of an incorrect
auth, it will never succeed right?

> I've also noticed that there's no way to throttle subscription callbacks. I
> don't know if this is something exactly necessary, but it could be nice to
> be able to say "I only want to process 2,500 messages per second" or
> something similar.

The best way to deal with this is via prefetch.

Aman

Daniel DeLeo

unread,
Dec 3, 2009, 8:24:59 PM12/3/09
to ruby...@googlegroups.com
I've found that I need a no_declare option for MQ::Queue as well.

    def initialize mq, name, opts = {}
      @mq = mq
      @opts = opts
      @bindings ||= {}
      @mq.queues[@name = name] ||= self
      @mq.callback{
      @mq.send Protocol::Queue::Declare.new({ :queue => name,
                                              :nowait => true }.merge(opts))
      } unless opts[:no_declare]
    end

I'd also like amqp to treat Protocol::Connection::Close as a
disconnect. I did this in two commits, available here:
http://github.com/danielsdeleo/amqp/tree/require_connect_to_publish

Otherwise, amqp is blissfully ignorant when Rabbit is shutdown and
continues publishing until the network connection is dropped.

Thanks,
Dan DeLeo

Matt Todd

unread,
Dec 3, 2009, 8:24:57 PM12/3/09
to ruby...@googlegroups.com
> This reconnect interval should help quite a bit. I especially would like the
> exponential (or linear, at least) intervals.

I wonder if it should eventually give up? In the case of an incorrect
auth, it will never succeed right?

I've considered this as well. Perhaps there should be a limit of some sort.

Ideally, we could fire off an event when we've decided to give up so that we can log it and/or send an alert to an admin.
 
> I've also noticed that there's no way to throttle subscription callbacks. I
> don't know if this is something exactly necessary, but it could be nice to
> be able to say "I only want to process 2,500 messages per second" or
> something similar.

The best way to deal with this is via prefetch.

I thought as much. Need to mess around with this I guess to see if it suits.

Matt

Dmitriy

unread,
Dec 3, 2009, 8:39:23 PM12/3/09
to AMQP
Support for basic.return (if a msg is published with immediate=true or
mandatory=true).

http://github.com/somic/amqp/commit/bf01cb202e0a02f4e10f43abc8d5aab58b5e0fde

Not sure that this functionality is used extensively though.

- Dmitriy

Chuck Remes

unread,
Dec 3, 2009, 9:40:37 PM12/3/09
to ruby...@googlegroups.com

On Dec 3, 2009, at 6:41 PM, Aman Gupta wrote:

> Here's the list of issues I know about:
>
> <snip>
>
> Other things I would like to do before the release:
>
> - better documentation about prefetch and a getting started section
> in the README that emphasizes prefetch+subscribe with ack
> <snip>

I'd like to help with this. The original docs I submitted were wrong in a lot of places. I understand AMQP a bit better now and could probably do a much better job.

I'll try and make some progress on this during December.

I'll also look at adding a Quickstart and a FAQ section.

cr

Aman Gupta

unread,
Dec 3, 2009, 9:48:52 PM12/3/09
to ruby...@googlegroups.com
Awesome, that would be very helpful.

Aman

>
> cr

Mathias Meyer

unread,
Dec 4, 2009, 6:39:21 AM12/4/09
to ruby-amqp
On Fri, Dec 4, 2009 at 1:21 AM, Jordan Sissel <j...@semicomplete.com> wrote:

> monkeypatch to log when we are reconnecting, and also delay
> reconnection to not swamp things.
>
> # http://github.com/tmm1/amqp/issues/#issue/3
> # This is our (lame) hack to at least notify the user that something is
> # wrong.
> module AMQP
>  module Client
>    alias :original_reconnect :reconnect
>    def reconnect(*args)
>      $logger.warn "reconnecting to broker (bad MQ settings?)"
>
>      # some rate limiting
>      sleep(5)
>
>      original_reconnect(*args)
>    end
>  end
> end
>
> Ideally, being able to register a handler for the reconnect event
> would be nice. I can make a patch for this if you want.
>
We made a patch for the connect/disconnect event handler being exposed properly:
http://github.com/peritor/amqp/commit/4adb2b72b105749f4541271eecca9eaf4aac6659

Cheers, Mathias
--
http://paperplanes.de
http://twitter.com/roidrage

Mathias Meyer

unread,
Dec 4, 2009, 6:39:48 AM12/4/09
to ruby-amqp
On Fri, Dec 4, 2009 at 1:47 AM, Matt Todd <chio...@gmail.com> wrote:

> I've seen times when the reconnects would basically hammer RabbitMQ so hard
> it took 100% CPU; the problem was an authorization error but AMQP just tried
> eternally.
>
> This reconnect interval should help quite a bit. I especially would like the
> exponential (or linear, at least) intervals.

We've seen this problem as well, and it's an inherent problem in the
AMQP spec, so unfortunately that problem can only be solved by
reducing the time. RabbitMQ can choose to implement some sort of rate
limit according to the spec, but so far it doesn't. For me giving up
at some point is not the greatest idea, because then I'll have to go
out and restart all clients manually once the problem is fixed. We've
been using this patch with some success:
http://github.com/peritor/amqp/commit/f33bda9f97fa34ae63ec73138e197bd22856440c

It's not great, and it's still possible that a client will DoS your
messaging server, but it's not something that can be fully fixed in
the client library in my opinion, so at least not hammering it with
auth requests would be nice.

Matt Todd

unread,
Dec 4, 2009, 12:42:21 PM12/4/09
to ruby...@googlegroups.com
We've seen this problem as well, and it's an inherent problem in the
AMQP spec, so unfortunately that problem can only be solved by
reducing the time. RabbitMQ can choose to implement some sort of rate
limit according to the spec, but so far it doesn't. For me giving up
at some point is not the greatest idea, because then I'll have to go
out and restart all clients manually once the problem is fixed. We've
been using this patch with some success:
http://github.com/peritor/amqp/commit/f33bda9f97fa34ae63ec73138e197bd22856440c

Yeah, having to restart the clients has been a problem in the past for us, though I've been planning to add a signal handler (ie, USR1) to attempt a reconnect again.

Also, was just thinking, but are messages queued up when being published dfrom the clients even if they are not connecting? Do they give an error if not? Or is this something that I need to add to my application logic to test if the connection is valid before queueing messages, otherwise I either hold in memory or persist to disk? (Obviously, I can only hold so much in memory before it gets to be a problem, if this is the case).

Matt

Aman Gupta

unread,
Dec 4, 2009, 12:48:23 PM12/4/09
to ruby...@googlegroups.com
On Fri, Dec 4, 2009 at 12:42 PM, Matt Todd <chio...@gmail.com> wrote:
>> We've seen this problem as well, and it's an inherent problem in the
>> AMQP spec, so unfortunately that problem can only be solved by
>> reducing the time. RabbitMQ can choose to implement some sort of rate
>> limit according to the spec, but so far it doesn't. For me giving up
>> at some point is not the greatest idea, because then I'll have to go
>> out and restart all clients manually once the problem is fixed. We've
>> been using this patch with some success:
>>
>> http://github.com/peritor/amqp/commit/f33bda9f97fa34ae63ec73138e197bd22856440c
>
> Yeah, having to restart the clients has been a problem in the past for us,
> though I've been planning to add a signal handler (ie, USR1) to attempt a
> reconnect again.
> Also, was just thinking, but are messages queued up when being published
> dfrom the clients even if they are not connecting? Do they give an error if
> not? Or is this something that I need to add to my application logic to test
> if the connection is valid before queueing messages, otherwise I either hold
> in memory or persist to disk? (Obviously, I can only hold so much in memory
> before it gets to be a problem, if this is the case).

Yes, currently the published messages will get queued up locally
indefinitely. I'd like to change the behavior instead to raise an
exception when #published is called but #connected? is false.

Aman

> Matt
>
>
> --
> Matt Todd
> Highgroove Studios
> www.highgroove.com
> cell: 404-314-2612
> blog: maraby.org
>
> Scout - Web Monitoring and Reporting Software
> www.scoutapp.com
>

Aman Gupta

unread,
Dec 8, 2009, 3:30:56 PM12/8/09
to ruby...@googlegroups.com
This is something you only call once on the channel right? I'm
thinking of adding it to MQ directly, so you can use it like:

mq = MQ.new
mq.prefetch(10)
mq.recover

Make sense?

Aman

Jacob Burkhart

unread,
Jan 9, 2010, 11:50:14 AM1/9/10
to ruby...@googlegroups.com
We have chosen to solve reconnect flooding in our fork with :max_retry
and :reconnect_timer config options:

http://github.com/brontes3d/amqp/commit/4b45b43b5eaea5a5e3b35b26aaf7c2cc4c21f9ed

But I agree an incremental backoff would be better.

We also wanted to be able to specify fallback_servers in the
configs... so if one server in the network of RabbitMQ servers was
unavailable the client would try the next one in the list:

http://github.com/brontes3d/amqp/commit/c55cb344dcabf04203417f3d07ea1de9ba08b4f8

Also with multiple servers, Rabbit likes to try and load balance for
us, for that to work we need to handle
AMQP::Protocol::Connection::Redirect

http://github.com/brontes3d/amqp/commit/a0b8aba8c495fd4f0d1dc3d49f8af2eb0e38a6bc

If you merge any of these, be sure to get the fixes to them committed later :-)

http://github.com/brontes3d/amqp/commits/master/

thanks,
Jacob

Mathias Meyer

unread,
Jan 9, 2010, 12:10:20 PM1/9/10
to ruby-amqp
On Sat, Jan 9, 2010 at 5:50 PM, Jacob Burkhart <igot...@gmail.com> wrote:

> We have chosen to solve reconnect flooding in our fork with :max_retry
> and :reconnect_timer config options:
>
> http://github.com/brontes3d/amqp/commit/4b45b43b5eaea5a5e3b35b26aaf7c2cc4c21f9ed
>
> But I agree an incremental backoff would be better.
>
> We also wanted to be able to specify fallback_servers in the
> configs... so if one server in the network of RabbitMQ servers was
> unavailable the client would try the next one in the list:
>
> http://github.com/brontes3d/amqp/commit/c55cb344dcabf04203417f3d07ea1de9ba08b4f8
>
> Also with multiple servers, Rabbit likes to try and load balance for
> us, for that to work we need to handle
> AMQP::Protocol::Connection::Redirect
>
> http://github.com/brontes3d/amqp/commit/a0b8aba8c495fd4f0d1dc3d49f8af2eb0e38a6bc
>
> If you merge any of these, be sure to get the fixes to them committed later :-)
>
> http://github.com/brontes3d/amqp/commits/master/
>

Sweet, sweet patches you made there.

My +1 to get all of them into 0.7.0. Great improvements.

Reply all
Reply to author
Forward
0 new messages