Shovel from AMQP 1.0 Source (Azure Service Bus) to RabbitMQ topic exchange without setting dest-exchange-key

249 views
Skip to first unread message

Gerald Haberfehlner

unread,
Jul 31, 2020, 9:59:12 AM7/31/20
to rabbitmq-users
Hi,


I am trying to use the Shovel Plugin (dynamic shovel) to shovel messages from Azure Service Bus to RabbitMQ. This basically works, but only if I set a dest-exchange-key in the shovel configuration, but I would prefer to reuse the source topic (AMQP 1.0 Subject) as the routing key in the destination if possible (as documented in the shovel documentation “Routing key when using dest-exchange. If this is not set, the original message's routing key will be used.”). When not setting a value for “dest-exchange-key” the shovel fails to shovel any messages. 


I am unfortunately not familiar with Erlang, but looking at the code I find hints that what I want to do could be supported:


Looking at the amqp1.0 code of RabbitMQ https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/99edc7f06d6c69fbd7b7d93b11b881d014810c15/src/rabbit_amqp1_0_message.erl#L158 the subject seems to be translated to the routing_key.


https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/99edc7f06d6c69fbd7b7d93b11b881d014810c15/src/rabbit_amqp1_0_incoming_link.erl#L140 seems to deal with incoming routing keys.


Any hints/feedback on the topic are highly appreciated


Thanks in advance

Gerald



Here is what my shovel configuration looks like:

{

   "value": {

     "ack-mode": "on-confirm",

     "dest-add-forward-headers": false,

     "src-address": "default_queue",

     "src-protocol": "amqp10",

     "src-uri": "amqps://<<redacted>>=@<<redacted>>.servicebus.windows.net?sasl=plain",

     "src-delete-after": "never",

     "dest-protocol": "amqp091",

     "dest-exchange": "development",

     //"dest-exchange-key": "de.override",

     "dest-uri": "amqp://rabbitmq:<<redacted>>@localhost"

   },

   "vhost": "/",

   "component": "shovel",

   "name": "experimentation-shovel"

}


The development exchange is a topic exchange. I see in the RabbitMQ logs that a value for the AMQP subject is present in the messages:

{'v1_0.properties',

           {utf8,<<"d0640c176e1c4ff9ab42d5e70f57a465">>

           },undefined,

           {utf8,<<"de.topicto">>

           },

           {utf8,<<"my.subject">>

           },undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined

       },



Log output (when no dest-echange-key is configured and messages are available):

2020-07-21 09:50:08.155 [info] <0.7375.0> connection <0.7375.0> (127.0.0.1:43709 -> 127.0.0.1:5672 - Shovel experimentation-shovel): user 'rabbitmq' authenticated and granted access to vhost '/'

2020-07-21 09:50:37.655 [warning] <0.7356.0> Shovel 'geha-experimentation-shovel' in virtual host '/' could not handle a destination message {'EXIT',<0.7384.0>,shutdown}

2020-07-21 09:50:37.655 [error] <0.7382.0> Supervisor {<0.7382.0>,amqp_channel_sup} had child writer started with rabbit_writer:start_link(#Port<0.910>, 1, 131072, rabbit_framing_amqp_0_9_1, <0.7384.0>, {<<"client 127.0.0.1:43709 -> 127.0.0.1:5672">>,1}, false, 1000000000) at <0.7385.0> exit with reason bad argument in call to erlang:size(undefined) in rabbit_framing_amqp_0_9_1:shortstr_size/1 line 187 in context child_terminated

2020-07-21 09:50:37.656 [error] emulator Error in process <0.7385.0> on node 'rab...@10.128.19.90' with exit value:{badarg,[{erlang,size,[undefined],[]},{rabbit_framing_amqp_0_9_1,shortstr_size,1,[{file,"src/rabbit_framing_amqp_0_9_1.erl"},{line,187}]},{rabbit_framing_amqp_0_9_1,encode_method_fields,1,[{file,"src/rabbit_framing_amqp_0_9_1.erl"},{line,1131}]},{rabbit_binary_generator,build_simple_method_frame,3,[{file,"src/rabbit_binary_generator.erl"},{line,59}]},{rabbit_writer,assemble_frames,5,[{file,"src/rabbit_writer.erl"},{line,338}]},{rabbit_writer,internal_send_command_async,3,[{file,"src/rabbit_writer.erl"},{line,371}]},{rabbit_writer,handle_message,3,[{file,"src/rabbit_writer.erl"},{line,238}]},{rabbit_writer,mainloop1,2,[{file,"src/rabbit_writer.erl"},{line,222}]}]}

2020-07-21 09:50:37.656 [error] <0.7382.0> Supervisor {<0.7382.0>,amqp_channel_sup} had child writer started with rabbit_writer:start_link(#Port<0.910>, 1, 131072, rabbit_framing_amqp_0_9_1, <0.7384.0>, {<<"client 127.0.0.1:43709 -> 127.0.0.1:5672">>,1}, false, 1000000000) at <0.7385.0> exit with reason reached_max_restart_intensity in context shutdown

2020-07-21 09:50:37.913 [error] <0.7356.0> Shovel 'experimentation-shovel' in virtual host '/' is stopping, reason: {noproc,{gen_server,call,[<0.7384.0>,next_publish_seqno,60000]}}

2020-07-21 09:50:37.915 [info] <0.7375.0> closing AMQP connection <0.7375.0> (127.0.0.1:43709 -> 127.0.0.1:5672 - Shovel geha-experimentation-shovel, vhost: '/', user: 'rabbitmq')

2020-07-21 09:50:37.915 [error] <0.7356.0> ** Generic server <0.7356.0> terminating ** Last message in was {amqp10_msg,{link_ref,receiver,<0.7365.0>,0},{amqp10_msg,{'v1_0.transfer',{uint,0},{uint,1},{binary,<<70,152,106,155,244,194,83,73,142,67,24,179,190,206,117,114>>},{uint,0},undefined,false,undefined,undefined,undefined,undefined,true},{'v1_0.header',undefined,undefined,{uint,1209600000},undefined,{uint,1}},{'v1_0.delivery_annotations',[{{symbol,<<"x-opt-lock-token">>},{uuid,<<155,106,152,70,194,244,73,83,142,67,24,179,190,206,117,114>>}}]},{'v1_0.message_annotations',[{{symbol,<<"x-opt-via-partition-key">>},{utf8,<<"de.topicviaPartitionKey">>}},{{symbol,<<"x-opt-enqueued-time">>},{timestamp,1595325007859}},{{symbol,<<"x-opt-sequence-number">>},{long,201}},{{symbol,<<"x-opt-enqueue-sequence-number">>},{long,0}},{{symbol,<<"x-opt-locked-until">>},{timestamp,1595325067892}}]},{'v1_0.properties',{utf8,<<"ae23c88f61a84f6e98ae5d3af77f7adb">>},undefined,{utf8,<<"de.topicto">>},{utf8,<<"de.topiclabel">>},undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined,undefined},{'v1_0.application_properties',[{{utf8,<<"routing_key">>},{utf8,<<"de.uprouting_key">>}},{{utf8,<<"subject">>},{utf8,<<"de.upsubject">>}},{{utf8,<<"dest-exchange-key">>},{utf8,<<"de.dest-exchange-key">>}},{{utf8,<<"exchange-key">>},{utf8,<<"de.exchange-key">>}}]},[{'v1_0.data',<<"M...">>}],...}}** When Server state == {state,undefined,undefined,undefined,undefined,{<<"/">>,<<"experimentation-shovel">>},dynamic,#{ack_mode => on_confirm,dest => #{current => {<0.7368.0>,<0.7384.0>,<<"amqp://localhost">>},dest_exchange => <<"development">>,fields_fun => #Fun<rabbit_shovel_parameters.11.955803>,module => rabbit_amqp091_shovel,props_fun => #Fun<rabbit_shovel_parameters.12.955803>,resource_decl => #Fun<rabbit_shovel_parameters.10.955803>,unacked => #{1 => 0},uris => ["amqp://rabbitmq:<<redacted>@localhost"]},name => <<"experimentation-shovel">>,reconnect_delay => 5,shovel_type => dynamic,source => #{current => #{conn => <0.7358.0>,link => {link_ref,receiver,<0.7365.0>,0},session => <0.7365.0>,uri => "amqps://<<redacted>>=@<<redacted>>.servicebus.windows.net?sasl=plain"},delete_after => never,last_acked_tag => -1,module => rabbit_amqp10_shovel,prefetch_count => 1000,remaining => unlimited,remaining_unacked => unlimited,source_address => <<"default_queue">>,uris => ["amqps://<<redacted>>=@<<redacted>>.servicebus.windows.net?sasl=plain"]}},undefined,undefined,undefined,undefined,undefined}** Reason for termination == ** {noproc,{gen_server,call,[<0.7384.0>,next_publish_seqno,60000]}}

2020-07-21 09:50:37.916 [error] <0.7356.0> CRASH REPORT Process <0.7356.0> with 0 neighbours exited with reason: no such process or port in call to gen_server:call(<0.7384.0>, next_publish_seqno, 60000) in gen_server2:terminate/3 line 1183

2020-07-21 09:50:37.916 [error] <0.7305.0> Supervisor {<0.7305.0>,rabbit_shovel_dyn_worker_sup} had child {<<"/">>,<<"experimentation-shovel">>} started with rabbit_shovel_worker:start_link(dynamic, {<<"/">>,<<"experimentation-shovel">>}, [{<<"ack-mode">>,<<"on-confirm">>},{<<"dest-add-forward-headers">>,false},{<<"dest-exchange">>,<<"...">>},...]) at <0.7356.0> exit with reason no such process or port in call to gen_server:call(<0.7384.0>, next_publish_seqno, 60000) in context child_terminated

2020-07-21 09:50:37.916 [info] <0.7358.0> Conn handle_info {'EXIT',<0.7356.0>,{noproc,{gen_server,call,[<0.7384.0>,next_publish_seqno,60000]}}} close_sent

2020-07-21 09:50:37.936 [warning] <0.7359.0> AMQP 1.0 connection socket was closed, connection state: 'expecting_frame_header'

2020-07-21 09:50:43.172 [info] <0.7458.0> accepting AMQP connection <0.7458.0> (127.0.0.1:47381 -> 127.0.0.1:5672)




Reply all
Reply to author
Forward
Message has been deleted
Message has been deleted
0 new messages