Sending AMQP 1.0 stream messages does not work with Apache Qpid Proton

311 views
Skip to first unread message

Jack

unread,
Oct 16, 2023, 10:32:20 AM10/16/23
to rabbitmq-users
Hi Guys,

when sending AMQP 1.0 messages to a rabbitmq stream we get the following error message. 

inequivalent arg 'x-queue-type' for queue 'test-stream' in vhost '/': received none but current is the value 'stream' of type 'longstr'

But how can we give this value in the qpid sender configuration? There seems to be no way to actually handover the header argument x-queue-type = stream ?

Any help is appreciated. Thanks!

Karl Nilsson

unread,
Oct 16, 2023, 10:46:08 AM10/16/23
to rabbitm...@googlegroups.com
Apologies it needs to be: /amq/queue/queue_name

On Mon, 16 Oct 2023 at 15:44, Karl Nilsson <kjni...@gmail.com> wrote:
You need to use the `/amq/queue_name` target format to address a pre-declared queue.

Cheers
Karl

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/45cdd339-7036-4ad5-8107-94123a9d639bn%40googlegroups.com.


--
Karl Nilsson


--
Karl Nilsson

Jack

unread,
Oct 16, 2023, 10:50:26 AM10/16/23
to rabbitmq-users
Thanks for your reply. However if I address a "normal" queue with qpid in rabbitmq it works without problems. As soon as I create a stream queue in rabbitmq it does not work anymore to send messages with qpid and we receive the above error. 
I guess we need to include the mentioned argument when creating a sender ?

Cheers Jack

Jack

unread,
Oct 16, 2023, 10:54:34 AM10/16/23
to rabbitmq-users
Just tried it with /amq/stream/name_of_the_stream but it does not work unfortunately.

Karl Nilsson

unread,
Oct 16, 2023, 10:56:02 AM10/16/23
to rabbitm...@googlegroups.com
What target format are you using?



--
Karl Nilsson

Jack

unread,
Oct 16, 2023, 11:05:36 AM10/16/23
to rabbitmq-users
What do you mean exactly with target format? We are using AMQP 1.0.

Karl Nilsson

unread,
Oct 16, 2023, 11:10:23 AM10/16/23
to rabbitm...@googlegroups.com
The target address string needs to have the `/amq/queue/` prefix to use a pre-declare queue

How about you show us a code snippet so we have something to work with?

It can be hard to help with minimal detail.

Cheers



--
Karl Nilsson

Jack

unread,
Oct 16, 2023, 11:15:04 AM10/16/23
to rabbitmq-users
Sure. Here are the most important parts of our qpid client configuration. It works fine with normal "classic" queues. The queue gets created if it does not exist, or if it exist already the message will be added to the the existing queue.
When we create a stream in rabbitmq and then try to add a message to the stream with the same coding we get the above error that we are missing the argument x-queue-type = stream:

"inequivalent arg 'x-queue-type' for queue 'test-stream' in vhost '/': received none but current is the value 'stream' of type 'longstr'"


Code snippets/partly code. In the moment when the create_sender is executed, the code fails/runs into a timeout.

my_domain = SSLDomain(SSLDomain.MODE_CLIENT)
    my_domain.set_credentials(
        cert_file="/app/swim-adapter/certs/client_certificate.pem",
        key_file="/app/swim-adapter/certs/client_key.pem",
        password=None,
    )
    my_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
    my_domain.set_trusted_ca_db("/app/swim-adapter/certs/ca.crt")

    try:
        conn = BlockingConnection(
            AMQP_URL,
            ssl_domain=my_domain,
            heartbeat=60,
            virtual_host="",
            allowed_mechs="PLAIN",
            allow_insecure_mechs=False,
            user=AMQP_USER,
            password=AMQP_PASSWORD,
            sasl_enabled=True,
        )

                topic_sender = conn.create_sender(
                    topic_name, options=CapabilityOptions()
                )

            delivery = topic_sender.send(
                Message(body=event.data)
            )


class CapabilityOptions(SenderOption):
    def apply(self, sender):
        # Deliveries; A terminus with both durably held configuration and durably held delivery state.
        sender.target.durability = Terminus.DELIVERIES

Karl Nilsson

unread,
Oct 16, 2023, 11:26:57 AM10/16/23
to rabbitm...@googlegroups.com
what is the value of the topic_name variable?



--
Karl Nilsson

Jack

unread,
Oct 17, 2023, 3:19:21 AM10/17/23
to rabbitmq-users
Thanks for your replies Karl. This value contains the name of the the existing stream, for example test-stream-1. After your earlier comment I also tried /amq/stream/test-stream-1 but it did not work as well.

If i use an existing queue (not stream), for example test-queue-1 it works without problems. As soon as I have a stream I can see the error message from my earlier post:

"inequivalent arg 'x-queue-type' for queue 'test-stream' in vhost '/': received none but current is the value 'stream' of type 'longstr'"

Karl Nilsson

unread,
Oct 17, 2023, 3:45:27 AM10/17/23
to rabbitm...@googlegroups.com
It needs to be /amq/queue/test-stream-1

Jack

unread,
Oct 17, 2023, 4:36:49 AM10/17/23
to rabbitmq-users
Thanks a lot Karl! It works fine now. Thats great news. We tried quite a bit to get this fixed. :)

When the stream does not exist, it will fail, but as long as the stream exists the message will be sent into it. Thats fine for us. Thank you!

One last question, maybe you know even though its a bit qpid specific: How is it possible to add header arguments to the sender object or message? For example x-message-ttl or x-overflow etc.

kjnilsson

unread,
Oct 17, 2023, 9:39:18 AM10/17/23
to rabbitmq-users
You can add message annotations but there aren't really many that the streams understand apart from x-stream-filter (as of 3.13).

Jack

unread,
Oct 17, 2023, 10:42:48 AM10/17/23
to rabbitmq-users
Thank you, I understand.

Priyanka M

unread,
Oct 18, 2023, 5:18:46 AM10/18/23
to rabbitm...@googlegroups.com
Anyone please share me api code for fetching data from rabbitmq using robot framework in python 

Reply all
Reply to author
Forward
0 new messages