Delayed Message Plugin - What am I doing wrong?

342 views
Skip to first unread message

Thiago Arrais

unread,
Sep 27, 2016, 8:17:05 AM9/27/16
to rabbitmq-users
Is anyone using RabbitMQ to implement delayed messages? I'm trying the rabbitmq_delayed_message_exchange plugin
as per the [2015 blogpost][1], but messages are not getting delivered after the delay and I'm getting errors on connection
close.

I'm using RabbitMQ 3.6.5 installed from the .deb from the apt repository on rabbitmq.com. I've tried using both the
Java Client and the Ruby Bunny client.

I've posted my [Java client code to github][2], if anyone wants to run it. When I remove the delay header (line 22), though,
the message gets delivered normally:

18      byte[] messageBodyBytes = "delayed payload".getBytes();
19      AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
20      Map<String, Object> headers = new HashMap<String, Object>();
21      headers.put("x-delay", 5000);
22      props.headers(headers); //commenting this makes the message delivery work
23      channel.basicPublish("delayed-ex-v2", "", props.build(), messageBodyBytes);

This is the error message I get when running the Java code above:

    Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0)
        [...]
        at com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:767)
        at com.thiagoarrais.DelayedSend.main(DelayedSend.java:24)

Michael Klishin

unread,
Sep 27, 2016, 8:18:45 AM9/27/16
to rabbitm...@googlegroups.com, Thiago Arrais
See server log. 

On 27 September 2016 at 15:17:30, Thiago Arrais (thiago...@gmail.com) wrote:
> Is anyone using RabbitMQ to implement delayed messages? I'm trying the rabbitmq_delayed_message_exchange
> plugin
> as per the [2015 blogpost][1], but messages are not getting delivered after
> the delay *and* I'm getting errors on connection
> close.
>
> I'm using RabbitMQ 3.6.5 installed from the .deb from the apt repository on
> rabbitmq.com. I've tried using both the
> Java Client and the Ruby Bunny client.
>
> I've posted my [Java client code to github][2], if anyone wants to run it.
> When I remove the delay header (line 22), though,
> the message gets delivered normally:
>
> 18 byte[] messageBodyBytes = "delayed payload".getBytes();
> 19 AMQP.BasicProperties.Builder props = new
> AMQP.BasicProperties.Builder();
> 20 Map headers = new HashMap();
> 21 headers.put("x-delay", 5000);
> 22 props.headers(headers); //commenting this makes the message
> delivery work
> 23 channel.basicPublish("delayed-ex-v2", "", props.build(),
> messageBodyBytes);
>
> This is the error message I get when running the Java code above:
>
> Exception in thread "main" com.rabbitmq.client.AlreadyClosedException:
> connection is already closed due to connection error; protocol method:
> #method(reply-code=541, reply-text=INTERNAL_ERROR,
> class-id=0, method-id=0)
> [...]
> at
> com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:767)
> at com.thiagoarrais.DelayedSend.main(DelayedSend.java:24)
>
> [1]:
> https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
> [2]: https://github.com/thiagoarrais/rabbitmq-delayed-send/blob/master/src/main/java/com/thiagoarrais/DelayedSend.java
>
> --
> You received this message because you are subscribed to the Google Groups "rabbitmq-users"
> group.
> To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
> To post to this group, send an email to rabbitm...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.
>

--
MK

Staff Software Engineer, Pivotal/RabbitMQ


Thiago Arrais

unread,
Sep 27, 2016, 1:35:34 PM9/27/16
to rabbitmq-users, thiago...@gmail.com
Here's what I could gather from `/var/log/rabbitmq/rabbit.log`:


ROR REPORT==== 27-Sep-2016::12:34:02 ===
** Generic server rabbit_delayed_message terminating 
** Last message in was {delay_message,
                        {exchange,
                         {resource,<<"/">>,exchange,<<"delayed-ex-v2">>},
                         'x-delayed-message',false,false,false,
                         [{<<"x-delayed-type">>,longstr,<<"fanout">>}],
                         undefined,undefined,
                         {[],[]}},
                        {delivery,false,false,<0.15024.0>,
                         {basic_message,
                          {resource,<<"/">>,exchange,<<"delayed-ex-v2">>},
                          [<<>>],
                          {content,60,
                           {'P_basic',undefined,undefined,
                            [{<<"x-delay">>,signedint,5000}],
                            undefined,undefined,undefined,undefined,undefined,
                            undefined,undefined,undefined,undefined,undefined,
                            undefined},
                           <<32,0,0,0,0,13,7,120,45,100,101,108,97,121,73,0,0,
                             19,136>>,
                           rabbit_framing_amqp_0_9_1,
                           [<<"delayed payload">>]},
                          <<79,220,42,172,168,5,32,250,45,15,32,47,123,167,102,
                            219>>,
                          false},
                         undefined,noflow},
                        5000}
** When Server state == {state,not_set}
** Reason for termination == 
** {'function not exported',
       [{erlang,system_time,[milli_seconds],[]},
        {rabbit_delayed_message,internal_delay_message,4,
            [{file,"src/rabbit_delayed_message.erl"},{line,179}]},
        {rabbit_delayed_message,handle_call,3,
            [{file,"src/rabbit_delayed_message.erl"},{line,122}]},
        {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,580}]},
        {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,237}]}]}

I've got newbie eyes here, but this looks like some probable
incompatibility between my RabbitMQ server and the plugin.

Maybe I'm looking into the wrong logfile.

Should I dig in and look for a compatible version combo? Or should I just ditch
this plugin and try to delay my messages with some other solution?

Michael Klishin

unread,
Sep 27, 2016, 1:47:12 PM9/27/16
to rabbitm...@googlegroups.com
You use a version of the plugin that requires Erlang 18.x (community plugins
can require any version they please, not the same as RabbitMQ does)
but the Erlang version actually in used seems to be something lower.

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Thiago Arrais

unread,
Sep 28, 2016, 8:14:52 PM9/28/16
to rabbitm...@googlegroups.com

Thanks for the help. I'll take a look at that.

Any pointers on how to change the erlang interpreter that rabbitmq runs on?


To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/YmgYq4qTCyY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

Thiago Arrais

unread,
Oct 2, 2016, 9:05:10 AM10/2/16
to rabbitm...@googlegroups.com

I've only got around to testing this now. Worked perfectly. Thanks, MK!

Reply all
Reply to author
Forward
0 new messages