How to use delayed plugin in PHP?

1,933 views
Skip to first unread message

James Titcumb

unread,
Sep 21, 2015, 10:06:38 AM9/21/15
to rabbitmq-users
Hi there,

Trying to use the rabbitmq_delayed_message_exchange plugin to send delayed messages with a PHP client.


I'm not 100% sure, but I think the issue is related to not setting the x-delay being set correctly on the message, but I'm not sure how to do this using videlalvaro/php-amqplib (and I can't find any PHP examples of how to do this)

$metadata = [
    'x-delay' => '10000',
];
$message = new AMQPMessage('hello', $metadata);

Any help would be appreciated.

Thanks
James

Alvaro Videla

unread,
Sep 21, 2015, 11:09:41 AM9/21/15
to rabbitm...@googlegroups.com
You seem to be declaring the exchange as "direct": https://gist.github.com/asgrim/c48be025dbcdf7d72c79#file-consumer-php-L29 but it has to be "x-delayed-message"

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

James Titcumb

unread,
Sep 21, 2015, 11:21:48 AM9/21/15
to rabbitmq-users
Hi Alvaro,

Yes indeed, you're right. Additionally, the message should be sent with:

$metadata = [
    'application_headers' => new AMQPTable([
        'x-delay' => 10000,
    ]),
];
$message = new AMQPMessage('hello', $metadata);

(specifically, `application_headers` was missing, and should be an AMQPTable)

Thanks for your help :)

Thanks
James

Enlai Chu

unread,
Oct 14, 2015, 1:30:55 PM10/14/15
to rabbitmq-users
Alvaro,

I stumbled upon your demo on how to set delayed messages (https://github.com/videlalvaro/php-amqplib/blob/19a1753fc1d7ea5b1b1d0de732b551df54ce8a98/demo/delayed_message.php). Thanks for that! 

However, I'm getting INTERNAL_ERROR when I run the demo and can't seem to find the issue. Any pointers? Error log below.

Thanks!   

=ERROR REPORT==== 14-Oct-2015::20:12:30 ===

** Generic server <0.13328.0> terminating

** Last message in was {'$gen_cast',

                           {method,

                               {'basic.publish',0,<<"delayed_exchange">>,

                                   <<>>,false,false},

                               {content,60,none,

                                   <<48,0,0,0,0,13,7,120,45,100,101,108,97,121,

                                     73,0,0,27,88,2>>,

                                   rabbit_framing_amqp_0_9_1,

                                   [<<"hello">>]},

                               flow}}

** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.13323.0>,

                            <0.13326.0>,<0.13323.0>,

                            <<"127.0.0.1:39965 -> 127.0.0.1:5672">>,

                            {lstate,<0.13327.0>,false,false},

                            none,1,

                            {[],[]},

                            {user,<<"guest">>,

                                [administrator],

                                rabbit_auth_backend_internal,

                                {internal_user,<<"guest">>,

                                    <<163,61,69,227,33,127,81,111,6,60,107,73,

                                      130,16,237,155,234,254,227,73>>,

                                    [administrator]}},

                            <<"/">>,<<"delayed_queue">>,

                            {dict,0,16,16,8,80,48,

                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                 []},

                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                  [],[]}}},

                            {dict,0,16,16,8,80,48,

                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                 []},

                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                  [],[]}}},

                            {dict,0,16,16,8,80,48,

                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                 []},

                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                  [],[]}}},

                            {set,0,16,16,8,80,48,

                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                 []},

                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                  [],[]}}},

                            {dict,0,16,16,8,80,48,

                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                 []},

                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                  [],[]}}},

                            {set,0,16,16,8,80,48,

                                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                 []},

                                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],

                                  [],[]}}},

                            <0.13321.0>,

                            {state,fine,5000,#Ref<0.0.2.55186>},

                            false,1,

                            {{0,nil},{0,nil}},

                            [],

                            [{<<"authentication_failure_close">>,bool,true},

                             {<<"publisher_confirms">>,bool,true},

                             {<<"consumer_cancel_notify">>,bool,true},

                             {<<"exchange_exchange_bindings">>,bool,true},

                             {<<"basic.nack">>,bool,true},

                             {<<"connection.blocked">>,bool,true}],

                            none}

** Reason for termination == 

** {function_clause,

       [{rabbit_delayed_message_utils,get_msg,

            [{delivery,false,<0.13328.0>,

                 {basic_message,

                     {resource,<<"/">>,exchange,<<"delayed_exchange">>},

                     [<<>>],

                     {content,60,

                         {'P_basic',undefined,undefined,

                             [{<<"x-delay">>,signedint,7000}],

                             2,undefined,undefined,undefined,undefined,

                             undefined,undefined,undefined,undefined,

                             undefined,undefined},

                         <<48,0,0,0,0,13,7,120,45,100,101,108,97,121,73,0,0,27,

                           88,2>>,

                         rabbit_framing_amqp_0_9_1,

                         [<<"hello">>]},

                     <<169,4,190,143,33,82,20,170,124,153,141,120,12,46,106,205>>,

                     true},

                 undefined}]},

        {lists,foldl,3},

        {rabbit_delayed_message_utils,get_delay,1},

        {rabbit_exchange_type_delayed_message,delay_message,2},

        {rabbit_exchange_type_delayed_message,route,2},

        {rabbit_exchange,route1,3},

        {rabbit_exchange,route,2},

        {rabbit_channel,handle_method,3}]}


=ERROR REPORT==== 14-Oct-2015::20:12:30 ===

AMQP connection <0.13323.0> (running), channel 1 - error:

{function_clause,

    [{rabbit_delayed_message_utils,get_msg,

         [{delivery,false,<0.13328.0>,

              {basic_message,

                  {resource,<<"/">>,exchange,<<"delayed_exchange">>},

                  [<<>>],

                  {content,60,

                      {'P_basic',undefined,undefined,

                          [{<<"x-delay">>,signedint,7000}],

                          2,undefined,undefined,undefined,undefined,undefined,

                          undefined,undefined,undefined,undefined,undefined},

                      <<48,0,0,0,0,13,7,120,45,100,101,108,97,121,73,0,0,27,88,

                        2>>,

                      rabbit_framing_amqp_0_9_1,

                      [<<"hello">>]},

                  <<169,4,190,143,33,82,20,170,124,153,141,120,12,46,106,205>>,

                  true},

              undefined}]},

     {lists,foldl,3},

     {rabbit_delayed_message_utils,get_delay,1},

     {rabbit_exchange_type_delayed_message,delay_message,2},

     {rabbit_exchange_type_delayed_message,route,2},

     {rabbit_exchange,route1,3},

     {rabbit_exchange,route,2},

     {rabbit_channel,handle_method,3}]}


=WARNING REPORT==== 14-Oct-2015::20:12:30 ===

Non-AMQP exit reason '{function_clause,

                       [{rabbit_delayed_message_utils,get_msg,

                         [{delivery,false,<0.13328.0>,

                           {basic_message,

                            {resource,<<"/">>,exchange,<<"delayed_exchange">>},

                            [<<>>],

                            {content,60,

                             {'P_basic',undefined,undefined,

                              [{<<"x-delay">>,signedint,7000}],

                              2,undefined,undefined,undefined,undefined,

                              undefined,undefined,undefined,undefined,

                              undefined,undefined},

                             <<48,0,0,0,0,13,7,120,45,100,101,108,97,121,73,0,

                               0,27,88,2>>,

                             rabbit_framing_amqp_0_9_1,

                             [<<"hello">>]},

                            <<169,4,190,143,33,82,20,170,124,153,141,120,12,46,

                              106,205>>,

                            true},

                           undefined}]},

                        {lists,foldl,3},

                        {rabbit_delayed_message_utils,get_delay,1},

                        {rabbit_exchange_type_delayed_message,delay_message,2},

                        {rabbit_exchange_type_delayed_message,route,2},

                        {rabbit_exchange,route1,3},

                        {rabbit_exchange,route,2},

                        {rabbit_channel,handle_method,3}]}'


=INFO REPORT==== 14-Oct-2015::20:12:30 ===

closing AMQP connection <0.13323.0> (127.0.0.1:39965 -> 127.0.0.1:5672)

Alvaro Videla

unread,
Oct 14, 2015, 2:02:34 PM10/14/15
to rabbitmq-users
What version of the plugin are you using against which version of the server?

Enlai Chu

unread,
Oct 14, 2015, 2:37:56 PM10/14/15
to rabbitmq-users
Hi Alvaro, 

I'm running RabbitMQ 3.1.5 and Plugin 

rabbitmq_delayed_message_exchange-0.0.1-rmq3.5.x-9bf265e4.ez. Ah, from the GitHub page, it looks like I need to run RabbitMQ 3.5.3 or later.  Will give that a shot.


Thanks for the pointer! 

Reply all
Reply to author
Forward
0 new messages