Consuming Streams using an AMQP 1.0 client

292 views
Skip to first unread message

Jonathon Learmond

unread,
Jun 13, 2022, 2:03:44 PM6/13/22
to rabbitmq-users
Hi everyone,
Based on the streams documentation available:
I can't find any confirmation whether consuming a stream using an AMQP 1.0 client is supported, or, is it only supported using 0.9.1 clients?
If 1.0 clients are supported, is offset consuming supported like how it is using the x-stream-offset consumer argument for 0.9.1 clients?

Much appreciated!
Message has been deleted

Gabriele Santomaggio

unread,
Jun 14, 2022, 5:12:55 AM6/14/22
to rabbitmq-users
Sorry for the previous answer. I deleted it. I was not correct. 
Yes, we support x-stream-offset  for AMQP 1.0.

Regards
-
Gabriele 


kjnilsson

unread,
Jun 14, 2022, 6:15:28 AM6/14/22
to rabbitmq-users
Hi,

You can consume using AMQP 1.0 using a source filter and you will get a message annotation (x-stream-offset) included with every message.

Add "rabbitmq:stream-offset-spec" to your source filter set and use the same syntax as for AMQP 0.9.1 to specify the offset [1]


Jonathon Learmond

unread,
Jun 14, 2022, 11:01:50 AM6/14/22
to rabbitmq-users
Great! Thanks for the quick responses. I'll test it out with my specific setup and report back with some details.

Tony Tian

unread,
Jun 23, 2022, 3:53:34 PM6/23/22
to rabbitmq-users
Hello everyone,

I am trying to use QPID-proton-cpp AMQP 1.0 client library https://qpid.apache.org/releases/qpid-proton-0.37.0/proton/cpp/api/index.html  to connect to rabbitmq stream. I am having issue cause I keep get a inequivalent error.

amqp:precondition-failed: PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'stream-test' in vhost '/': received none but current is the value 'stream' of type 'longstr'

I have been trying to provide this information to rabbitMQ with so many different approaches but none of them work. Do you have any clue? 

Thanks a lot!
Tony


This is the log information
2022-06-23 13:40:58.168806-06:00 [info] <0.21847.4> accepting AMQP connection <0.21847.4> (127.0.0.1:38860 -> 127.0.0.1:5672)
2022-06-23 13:40:58.176029-06:00 [error] <0.21876.4> Channel error on connection <0.21856.4> (127.0.0.1:38860 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 2:
2022-06-23 13:40:58.176029-06:00 [error] <0.21876.4> operation queue.declare caused a channel exception precondition_failed: inequivalent arg 'x-queue-type' for queue 'stream-test2' in vhost '/': received none but current is the value 'stream' of type 'longstr'
2022-06-23 13:40:58.176403-06:00 [warning] <0.21853.4> Closing session for connection <0.21847.4>:
2022-06-23 13:40:58.176403-06:00 [warning] <0.21853.4> {'v1_0.error',{symbol,<<"amqp:precondition-failed">>},
2022-06-23 13:40:58.176403-06:00 [warning] <0.21853.4>               {utf8,<<"PRECONDITION_FAILED - inequivalent arg 'x-queue-type' for queue 'stream-test2' in vhost '/': received none but current is the value 'stream' of type 'longstr'">>},
2022-06-23 13:40:58.176403-06:00 [warning] <0.21853.4>               undefined}
2022-06-23 13:40:58.177923-06:00 [warning] <0.21847.4> closing AMQP connection <0.21847.4> (127.0.0.1:38860 -> 127.0.0.1:5672):
2022-06-23 13:40:58.177923-06:00 [warning] <0.21847.4> client unexpectedly closed TCP connection

Tcp dump information is attached
tcpdump

Arnaud Cogoluègnes

unread,
Jun 27, 2022, 3:12:24 AM6/27/22
to rabbitmq-users
Somehow you're trying to re-create the stream-test2 queue with different arguments. This is something you should fix on the client side, maybe just a setting. It's hard to say without the code.

You can also ask on the Qpid support channels.

Reply all
Reply to author
Forward
0 new messages