Shovel Not Moving Properties to Azure Service Bus

145 views
Skip to first unread message

James D'Arcy

unread,
May 21, 2019, 11:56:20 PM5/21/19
to rabbitmq-users
Hi All

I have been able to get the rmq shovel working up to the point where messages are arriving successfully into the ASB queue.  I have also been successful in declaring the shovel details using both dynamic and static definitions.

However, the problem I face is that I have noticed that only one message property contained in the source message is arriving at the Service Bus consumer. The only one that I can see that is arriving successfully at the consumer is "content_type". Other message properties, such as correlation_id, group_id, subject are not arriving.

No application defined properties arrive at all.

I am certainly not an Erlang developer. Nevertheless I have had a look through the shovel Erlang code and I can see line is the file "rabbit_amqp10_shovel.erl" has some code that looks like it is only copying the content_type:

set_message_properties(Props, Msg) ->

    maps
:fold(fun(delivery_mode, 2, M) ->
                      amqp10_msg
:set_headers(#{durable => true}, M);
                 
(content_type, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{content_type => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(_, _, M) -> M
             
end, Msg, Props).


I have been able to set property values manually within the advanced.config file with the "properties" clause of the shovel destination definition But I really need them to be dynamically set from the rmq (c++) client application


   
[%% A named shovel worker.
     
{ism_tvcrecognition_shovel,
       
[
       
%% List the source broker(s) from which to consume.
       
%%
       
{source,
         
[
         
{protocol, amqp091},
         
{uris, ["amqp://"]},
         
{queue, <<"ism.tvcrecognition-q">>},
         
{declarations, [
             
{'exchange.declare',
             
[ {exchange, <<"ism.tvcrecognition-x">>},
               
{type, <<"direct">>},
                durable
             
]},
           
{'queue.declare',
             
[{queue, <<"ism.tvcrecognition-q">>},
               
{arguments,
                   
[{<<"x-message-ttl">>, long, 60000}]}
             
]},
           
{'queue.bind',
             
[ {exchange, <<"ism.tvcrecognition-x">>},
               
{queue,    <<"ism.tvcrecognition-q">>},
               
{routing_key,<<"ism.tvcrecognition">>}
             
]}
           
]}
         
]},


       
%% List the destination broker(s) to publish to.
       
%%
       
{destination,
         
[
         
{protocol, amqp10},
         
{uris, ["amqps://<REDACTED>:<REDACTED>@<REDACTED>.servicebus.windows.net:5671?versions=tlsv1.2"]},
         
{add_forward_headers, true},
         
{add_timestamp_header, true},
         
{target_address, <<"ism-sbt-tvcrecognition">>},
         
{application_properties, [{<<"MyNameIs">>, <<"James">>}]},
         
{properties, [
           
{group_id, <<"1">>},
           
{subject, <<"SendHashForMatching">>},
           
{content_encoding, <<"UTF-8">>}
         
]}
         
]}
     
]}, %% End of ism_tvcrecognition_shovel



Incidentally, I have used rmq_tracer module and can see the correct properties arriving at the rmq server:


================================================================================
2019-05-22 1:43:40:939: Message published


Node:         rabbit@<REDACTED>
Connection:   172.17.0.1:36970 -> 172.17.0.2:5672
Virtual host: /
User:         guest
Channel:      1
Exchange:     ism.tvcrecognition-x
Routing keys: [<<"ism.tvcrecognition">>]
Routed queues: [<<"ism.tvcrecognition-q">>]
Properties:   [{<<"app_id">>,longstr,<<"ism.tvcrecognition">>},
               {<<"message_id">>,longstr, <<"dee0c4da-302f-40b0-abe3-de8ae4f908bf">>},
               {<<"correlation_id">>,longstr, <<"dee0c4da-302f-40b0-abe3-de8ae4f908bf">>},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"BodyEncoding">>,longstr,<<"Amqp">>},
                 {<<"ChannelId">>,long,21},
                 {<<"Environment">>,longstr,<<"dev">>},
                 {<<"MessageHandlerType">>,longstr,<<"SendForMatching">>},
                 {<<"Originator">>,longstr,<<"ism.tvcrecognition">>},
                 {<<"RegionId">>,long,1},
                 {<<"Source">>,longstr,<<"RAVEL">>},
                 {<<"VersionBuild">>,signedint,0},
                 {<<"VersionMajor">>,signedint,0},
                 {<<"VersionMinor">>,signedint,0}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"application/
json">>}]
Payload:
{<REDACTED>}


================================================================================

Can anyone advise me on how to get message properties copied along with the message?

Kind regards


James

Karl Nilsson

unread,
May 22, 2019, 2:38:49 AM5/22/19
to rabbitmq-users
Hi,

The set_message_properties function seems to only deal partially with 0.9.1 properties. Were you messages published to RabbitMQ using 1.0?

The unfortunate fact is that it is hard to not have a fidelity loss when translating from 1.0 -> 0.9.1 -> 1.0 as the 0.9.1 model (that is used inside RabbitMQ) is more restrictive than 1.0. That said I think we can improve on what is here already. Would you mind raising an issue in the shovel repository for this?

The main challenge will be group-id as that isn't a property that exists in 0.9.1 and I am not sure the AMQP 1.0 plugin copies that into the 0.9.1 model either. Will take a look though.

Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/3eb4b642-f75a-4bd2-b3a3-b5228594b6ce%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
Karl Nilsson

James D'Arcy

unread,
May 22, 2019, 3:06:55 AM5/22/19
to rabbitmq-users
Hi KArl

Thanks for your response.

FYI, my overall goal is to use a specialised broker process to move messages from each host into Azure Service Bus.  Currently I have many process running on each host, each sending out many messages directly to the ASB using qpid proton c++ libraries. However, I am having all sorts of problems keeping these processes as reliable as I need them to be. I therefore want to move to creating a broker on each host that collects the messages from each local process and forwards them to ASB. I have had no success with the qpid cpp broker connecting to ASB.

However, I have succeeded in getting the RMQ shovel to connect to ASB and all I need now are the message properties to be sent across.

In development I have re-coded the client processes to use the RMQ 0.9.1 protocol and those messages are being sent to the RMZ server. 

So the process is simply 0.9.1 => 1.0, I am not using a 1.0 client protocol.

Also note that I am interested in moving application defined properties across as well - those marked as "headers" in the above RMQ trace output. 

I don't suppose there is any way I could call a method within the advanced.config file as part of the destination properties configuration? Something like:



 {destination,
         [
          {protocol, amqp10},
          {uris, ["amqps://<REDACTED>:<REDACTED>@<REDACTED>.servicebus.windows.net:5671?versions=tlsv1.2"]},
          {add_forward_headers, true},
          {add_timestamp_header, true},
          {target_address, <<"ism-sbt-tvcrecognition">>},
           {properties, [
                          {correlation_id, GetSourcePropertyValue("message_id"),   %% <<== Correlation ID set Dynamically from incoming standard message property ?
             group_id, GetSourceApplicationPropertyValue("region_id")} %% <<== Session ID set Dynamically from incoming application message property ?
           ]}


         ]}
      ]}

Karl Nilsson

unread,
May 22, 2019, 3:23:41 AM5/22/19
to rabbitmq-users
it isn't possible to call a function as part of the configuration. correlation_id I think we can map 1-2-1 as it exists in both protocols. Not sure how map the group_id. Perhaps something like a bit of custom mapping config might work. Are you adding region_id as a custom AMQP 0.9.1 header?

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.


--
Karl Nilsson

James D'Arcy

unread,
May 22, 2019, 3:46:28 AM5/22/19
to rabbitmq-users
Yes, I'mm adding region_id as a customer header.

I know nothing about erlang, however, I have been able to get the shovel source code and compile it from within IntelliJ.

Am I on the right track if I do something like the following (or is it just one step short of stupid)?


set_message_properties(Props, Msg) ->
    maps
:fold(fun(delivery_mode, 2, M) ->
                      amqp10_msg
:set_headers(#{durable => true}, M);

                 
(message_id, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{message_id => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(correlation_d, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{correlation_d => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(content_encoding, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{content_encoding => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(delivery_mode, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{delivery_mode => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(priority, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{priority => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(reply_to, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{reply_to => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(type, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{type => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(app_id, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{app_id => rabbit_data_coercion:to_binary(Ct)}, M);

                 
(content_type, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{content_type => rabbit_data_coercion:to_binary(Ct)}, M);

                 
(cluster_id, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{cluster_id => rabbit_data_coercion:to_binary(Ct)}, M);
                 
(headers, Ct, M) ->
                      amqp10_msg
:set_properties(
                       
#{headers => rabbit_data_coercion:to_binary(Ct)}, M);



                 
(_, _, M) -> M
             
end, Msg, Props).

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/07aa91db-c0ff-4295-a1f5-d5036c4fc3dd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
Karl Nilsson

Karl Nilsson

unread,
May 22, 2019, 4:35:14 AM5/22/19
to rabbitmq-users
Not quite - here is the type spec for the amqp 1.0 message properties which will show you what keys are supported and the type of their values:


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.


--
Karl Nilsson
Reply all
Reply to author
Forward
0 new messages