[rabbitmq-discuss] erlang-client issue, discarding content?

3 views
Skip to first unread message

Suhail Doshi

unread,
Jul 30, 2009, 3:22:29 PM7/30/09
to rabbitmq-discuss
My producer essentially dies arbitrarily because of this: 
Discarding content bearing method ({'basic.publish',1,<<"records">>,
                                       <<"event">>,false,false})

Any ideas?
Sincerely,
Suhail Doshi

--
http://mixpanel.com
Blog: http://blog.mixpanel.com

Suhail Doshi

unread,
Jul 30, 2009, 3:24:10 PM7/30/09
to rabbitmq-discuss
When that happens, my swap goes up, the CPU load goes up, consumers are not consuming anything. Very peculiar.

Suhail

Suhail Doshi

unread,
Jul 30, 2009, 3:39:54 PM7/30/09
to rabbitmq-discuss
Looking in my logs shows a large gap in time, in bold is where it is:

=INFO REPORT==== 30-Jul-2009::17:26:56 ===
    alarm_handler: {set,{system_memory_high_watermark,[]}}

=INFO REPORT==== 30-Jul-2009::17:26:57 ===
    alarm_handler: {clear,system_memory_high_watermark}

=INFO REPORT==== 30-Jul-2009::17:29:37 ===
    alarm_handler: {set,{system_memory_high_watermark,[]}}

=WARNING REPORT==== 30-Jul-2009::17:30:23 ===
exception on TCP connection <0.7304.42> from 127.0.0.1:48982
connection_closed_abruptly

=INFO REPORT==== 30-Jul-2009::17:30:25 ===
closing TCP connection <0.7304.42> from 127.0.0.1:48982

=INFO REPORT==== 30-Jul-2009::17:32:07 ===
    alarm_handler: {clear,system_memory_high_watermark}

=INFO REPORT==== 30-Jul-2009::19:00:09 ===
accepted TCP connection on 0.0.0.0:5672 from 127.0.0.1:38647

=INFO REPORT==== 30-Jul-2009::19:00:09 ===
starting TCP connection <0.13117.46> from 127.0.0.1:38647

=INFO REPORT==== 30-Jul-2009::19:00:33 ===
Rolling persister log to "/var/lib/rabbitmq/mnesia/rabbit/rabbit_persister.LOG.previous"

Matthias Radestock

unread,
Jul 30, 2009, 3:46:55 PM7/30/09
to Suhail Doshi, rabbitmq-discuss
Suhail Doshi wrote:
> Looking in my logs shows a large gap in time, in bold is where it is:
>
> =INFO REPORT==== 30-Jul-2009::17:26:56 ===
> alarm_handler: {set,{system_memory_high_watermark,[]}}

This indicates that rabbit got close to running out of memory and told
all connected clients to stop sending any more messages. Hence the
"Discarding ..." message you were seeing in the erlang client. Your
consumers should continue to receive messages, albeit slowly if rabbit
is swapping.

Perhaps your consumers are not acknowledging received messages, causing
them to keep piling up at the server?

I recommend checking the queue lengths/sizes with 'rabbitmqctl list_queues'.


Regards,

Matthias.

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq...@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

Suhail Doshi

unread,
Jul 30, 2009, 3:52:18 PM7/30/09
to Matthias Radestock, rabbitmq-discuss
mplivelog1 ~: sudo /usr/sbin/rabbitmqctl list_queues -p myqueue name consumers messages messages_ready

Listing queues ...
storage 5 1 0
...done.

I currently have it running again after a restart and that's usually the response I get, usually only 0-5 items in the queue, since it's being processed.

When the error was occurring and I checked the queue, there were effectively *zero* items in the queue according to that command I ran above in the queue. In python I definitely do acknowledge items via:

self.channel.basic_ack(data.delivery_tag)

where self.channel is amqp.Connection(host=host, **info).channel()

Suhail

On Thu, Jul 30, 2009 at 12:46 PM, Matthias Radestock <matt...@lshift.net> wrote:
Suhail Doshi wrote:
Looking in my logs shows a large gap in time, in bold is where it is:

=INFO REPORT==== 30-Jul-2009::17:26:56 ===
   alarm_handler: {set,{system_memory_high_watermark,[]}}

This indicates that rabbit got close to running out of memory and told all connected clients to stop sending any more messages. Hence the "Discarding ..." message you were seeing in the erlang client. Your consumers should continue to receive messages, albeit slowly if rabbit is swapping.

Perhaps your consumers are not acknowledging received messages, causing them to keep piling up at the server?

I recommend checking the queue lengths/sizes with 'rabbitmqctl list_queues'.


Regards,

Matthias.



Suhail Doshi

unread,
Jul 30, 2009, 3:54:40 PM7/30/09
to Matthias Radestock, rabbitmq-discuss
Here's the erlang module I use to send items to rabbit, it's largely adapted from code open sourced online:


-export([amqp_lifecycle/0, send_message/5, log/2]).

-include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit.hrl").
-include_lib("rabbitmq-erlang-client/rabbitmq_server/include/rabbit_framing.hrl").
-include("rabbitmq-erlang-client/include/amqp_client.hrl").

-record(rabbit_info, {channel, ticket, exchange, routing_key}).

amqp_lifecycle() ->
    User = "mixpanel",
    Password = "mixpanel0816",
    Realm = <<"mixpanel">>, %% virtual_host
    Connection = amqp_connection:start(User, Password, "127.0.0.1", Realm),
    Channel = amqp_connection:open_channel(Connection),
    Access = #'access.request'{
        realm = Realm,
        exclusive = false,
        passive = true,
        active = true,
        write = true,
        read = true
    },
    #'access.request_ok'{ticket = Ticket} = amqp_channel:call(Channel, Access),
    
    Q = <<"storage">>,
    X = <<"records">>,
    BindKey = <<"event">>,
 
    QueueDeclare = #'queue.declare'{ticket = Ticket, queue = Q,
                                    passive = false, durable = true,
                                    exclusive = false, auto_delete = false,
                                    nowait = false, arguments = []},
                                    
    #'queue.declare_ok'{queue = Q} = amqp_channel:call(Channel, QueueDeclare),
 
    ExchangeDeclare = #'exchange.declare'{ticket = Ticket,
                                          exchange = X, type = <<"direct">>,
                                          passive = false, durable = true,
                                          auto_delete = false, internal = false,
                                          nowait = false, arguments = []},
                                          
    #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExchangeDeclare),
    QueueBind = #'queue.bind'{ticket = Ticket,
                              queue = Q,
                              exchange = X,
                              routing_key = BindKey,
                              nowait = false, arguments = []},
    #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
    
    RabbitInfo = #'rabbit_info'{
        channel = Channel,
        ticket = Ticket,
        exchange = X,
        routing_key = BindKey
    },
    RabbitInfo.
    
send_message(Channel, Ticket, X, RoutingKey, Payload) ->
    BasicPublish = #'basic.publish'{ticket = Ticket,
                                    exchange = X,
                                    routing_key = RoutingKey,
                                    mandatory = false,
                                    immediate = false},
    BasicProperties = amqp_util:basic_properties(),
    Properties = BasicProperties#'P_basic'{delivery_mode=2}, %% Persistence plz
    Content = #content{class_id = 60,
         properties = Properties,
         properties_bin = none,
         payload_fragments_rev = [Payload]
        },
    amqp_channel:cast(Channel, BasicPublish, Content).

log(Key,Value) ->
    io:format("~p: ~p~n",[Key,Value]).


I do the following to send items to the queue:

event_queue_data(QueueInfo, Data) ->
    send_message(
        QueueInfo#'rabbit_info'.channel,
        QueueInfo#'rabbit_info'.ticket,
        QueueInfo#'rabbit_info'.exchange,
        QueueInfo#'rabbit_info'.routing_key,
        list_to_binary(Data)
    ),
    1.

Suhail Doshi

unread,
Jul 30, 2009, 4:15:34 PM7/30/09
to Matthias Radestock, rabbitmq-discuss
I attached some images that might provide better insight into the problem. You can basically see when everything went down and horribly wrong: memory usage, swap in/out, and load average.

Suhail
Picture 1.png
Picture 2.png
Picture 3.png

Suhail Doshi

unread,
Jul 30, 2009, 5:17:00 PM7/30/09
to Matthias Radestock, rabbitmq-discuss
The only thing I can think of is....definitely not acking properly but consumers are definitely re-processing items so it's like it acks but doesn't get removed out of memory?

What's the proper way to ack something in python? This is what I do:

self.channel.basic_ack(data.delivery_tag)

where data comes into the recv handler and channel is amqp.Connection(host=host, **info).channel()

Matthias Radestock

unread,
Jul 30, 2009, 5:41:26 PM7/30/09
to Suhail Doshi, rabbitmq-discuss
Suhail,

Suhail Doshi wrote:
> I attached some images that might provide better insight into the
> problem. You can basically see when everything went down and horribly
> wrong: memory usage, swap in/out, and load average.

Is it definitely the rabbitmq server that is busy and eating all the
memory, rather than some other process on your system (e.g. the erlang
client, python consumers, desktop apps, etc)?

If it *is* the server, can you observe the memory growing gradually from
the start, or does it suddenly spike?


Matthias

Suhail Doshi

unread,
Jul 30, 2009, 6:12:15 PM7/30/09
to Matthias Radestock, rabbitmq-discuss
Yeah definitely has to be, the moment I kill it the memory drops back down and it is gradually growing, you can even see it gradually growing in the images.

Suhail


On Thu, Jul 30, 2009 at 2:41 PM, Matthias Radestock <matt...@lshift.net> wrote:
Suhail,


Suhail Doshi wrote:
I attached some images that might provide better insight into the problem. You can basically see when everything went down and horribly wrong: memory usage, swap in/out, and load average.

Is it definitely the rabbitmq server that is busy and eating all the memory, rather than some other process on your system (e.g. the erlang client, python consumers, desktop apps, etc)?

If it *is* the server, can you observe the memory growing gradually from the start, or does it suddenly spike?


Matthias



Matthias Radestock

unread,
Jul 30, 2009, 7:37:48 PM7/30/09
to Suhail Doshi, rabbitmq-discuss
Suhail,

Suhail Doshi wrote:
> Yeah definitely has to be, the moment I kill it the memory drops back
> down and it is gradually growing, you can even see it gradually growing
> in the images.

Do you definitely see the rabbit process consuming the memory? How big
does it get? And what about CPU usage?

The reason I am asking is that killing the server also affects the
clients, i.e. it is possible that a *client* is consuming all the
memory, and releases it as soon as the server connection is severed.

So please check the *per-process* memory and CPU stats.

Now, if it really is the server that is eating the memory, please run
all the various list_* commands in rabbitmqctl to see whether any of
them are showing growth.

Finally, try publishing messages without marking them as persistent, and
see whether that changes the behaviour.


Regards,

Matthias.

Suhail Doshi

unread,
Jul 31, 2009, 3:33:50 PM7/31/09
to Matthias Radestock, rabbitmq-discuss
The memory leak is in the consumers, thanks Matthias.

Suhail


On Thu, Jul 30, 2009 at 4:37 PM, Matthias Radestock <matt...@lshift.net> wrote:
Suhail,


Suhail Doshi wrote:
Yeah definitely has to be, the moment I kill it the memory drops back down and it is gradually growing, you can even see it gradually growing in the images.

Do you definitely see the rabbit process consuming the memory? How big does it get? And what about CPU usage?

The reason I am asking is that killing the server also affects the clients, i.e. it is possible that a *client* is consuming all the memory, and releases it as soon as the server connection is severed.

So please check the *per-process* memory and CPU stats.

Now, if it really is the server that is eating the memory, please run all the various list_* commands in rabbitmqctl to see whether any of them are showing growth.

Finally, try publishing messages without marking them as persistent, and see whether that changes the behaviour.


Regards,

Matthias.



Suhail Doshi

unread,
Jul 31, 2009, 10:41:57 PM7/31/09
to Matthias Radestock, rabbitmq-discuss
Hi,


I basically keep the channel object and connection open the entire time so I do not do the statup stuff over and over, is this a bad idea? Once the channel has been retrieved, my consumer doesn't deal with it again, it just keeps accept requests from it. I don't see any global objects of mine increasing in size so I can only think I am doing something wrong from a rabbitmq perspective.

def connect_consumer():
    consumer = EventQueueConsumer()
    channel = consumer.get_channel()
    
    logging.info("Consumer started")
    
    while True:
        channel.wait()

Tony Garnock-Jones

unread,
Aug 4, 2009, 12:13:44 PM8/4/09
to Suhail Doshi, rabbitmq-discuss
Hi Suhail,

Suhail Doshi wrote:
> I basically keep the channel object and connection open the entire time
> so I do not do the statup stuff over and over, is this a bad idea?

On the contrary, that's a *good* idea...

> I don't see any global objects of mine increasing in size so I can
> only think I am doing something wrong from a rabbitmq perspective.

Hmm. Are you using more than one channel per connection? If so, the
other channels may be being starved, which may lead to a backlog of work
accumulating in them. You may need to restructure the main loop of your
application, if you're using more than one channel.

Regards,
Tony
--
[][][] Tony Garnock-Jones | Mob: +44 (0)7905 974 211
[][] LShift Ltd | Tel: +44 (0)20 7729 7060
[] [] http://www.lshift.net/ | Email: to...@lshift.net

Reply all
Reply to author
Forward
0 new messages