Sure. Here are the most important parts of our qpid client configuration. It works fine with normal "classic" queues. The queue gets created if it does not exist, or if it exist already the message will be added to the the existing queue.
When we create a stream in rabbitmq and then try to add a message to the stream with the same coding we get the above error that we are missing the argument x-queue-type = stream:
"inequivalent arg 'x-queue-type' for queue 'test-stream' in vhost '/': received none but current is the value 'stream' of type 'longstr'"
Code snippets/partly code. In the moment when the create_sender is executed, the code fails/runs into a timeout.
my_domain = SSLDomain(SSLDomain.MODE_CLIENT)
my_domain.set_credentials(
cert_file="/app/swim-adapter/certs/client_certificate.pem",
key_file="/app/swim-adapter/certs/client_key.pem",
password=None,
)
my_domain.set_peer_authentication(SSLDomain.VERIFY_PEER)
my_domain.set_trusted_ca_db("/app/swim-adapter/certs/ca.crt")
try:
conn = BlockingConnection(
AMQP_URL,
ssl_domain=my_domain,
heartbeat=60,
virtual_host="",
allowed_mechs="PLAIN",
allow_insecure_mechs=False,
user=AMQP_USER,
password=AMQP_PASSWORD,
sasl_enabled=True,
)
topic_sender = conn.create_sender(
topic_name, options=CapabilityOptions()
)
delivery = topic_sender.send(
Message(body=event.data)
)
class CapabilityOptions(SenderOption):
def apply(self, sender):
# Deliveries; A terminus with both durably held configuration and durably held delivery state.
sender.target.durability = Terminus.DELIVERIES