Quarkus(Java Framework) RabbitMQ producer not sending messages successfully to React Consumer

231 views
Skip to first unread message

Jake

unread,
Jul 7, 2023, 4:28:43 PM7/7/23
to rabbitmq-users
Hello!

I have been stuck on this one for a few weeks now and would appreciate any guidance from the community.

I have a RabbitMQ Producer using Quarkus(Java Framework) that sends messages through an outgoing channel and also sends a routing key in the metadata of the messages. I am using a virtual-host as well. I've been referencing the Quarkus RabbitMQ usage documentation.

I have a React consumer that subscribes to routingKeys and expects them in a format using global markers. The routing key seems to be different but I got it to work using a different Producer using the same Producer routingKey. This consumer should remained untouched as it works with a previous consumer that I am replacing.

The Producer sends messages to an outgoing channel I have called "reports" for this example. The Consumer connects to RabbitMQ with a url to the broker and with a virtual host. I have the virtual host configuration set up in my Producer and the broker url is defined in a layer between the consumer and producer.

Right now the behavior looks as follows:
The PRODUCER appears to successfully send outgoing messages and does not give any connection errors. However the consumer does not pick up the messages and does not print out any logs when I add them in.
Does anyone know what could be going wrong or could give any ideas to troubleshoot?

Thank you.
Below is the code.

PRODUCER:
routingKey: "dim.*.groups.*.reports.*" where the * represents a unique ID.

PRODUCER CODE:
@Inject
@Channel("reports")
Emitter<Report> emitter;

private OutgoingRabbitMQMetadata buildMetadata(String routingKey) {
return new OutgoingRabbitMQMetadata.Builder()
    .withRoutingKey(routingKey)
    .withTimestamp(ZonedDateTime.now())
    .build();
}

public void sendMessage(Report reportString routingKey) {
    OutgoingRabbitMQMetadata metadata = buildMetadata(routingKey);
    emitter.send(Message.of(exportMetadata.of(metadata)));
}

PRODUCER APP PROPERTIES:(following the documentation)
rabbitmq-host=my-host.mydomain.com
rabbitmq-port=5672
rabbitmq-username=username
rabbitmq-password=password
rabbitmq-virtual-host=myhost

mp.messaging.outgoing.reports.connector=smallrye-rabbitmq



CONSUMER:
routingKey: "/topic/dim.*.groups.*.reports.*"

CONSUMER CODE:
const client = new Client();
useEffect(() => {
const wsSubscriptions = {};
const hasProcessing = myList?.linkedDownloads?.filter(dl => dl.status === 'PROCESSING')?.length;
if (hasProcessing) {
client.configure({
brokerURL: globalReducer.dimStore.socketUrl,
connectHeaders: {
login: 'username',
passcode: 'passcode',
host: globalReducer.dimStore.socketVHost
},
heartbeatOutgoing: 10000,
heartbeatIncoming: 10000,
reconnectDelay: 500,
onConnect: () => {
if (myList?.linkedDownloads && client) {
const routingKey = `/topic/dim.*.groups.${groupId}.reports.*`;
wsSubscriptions[groupId] = client.subscribe(routingKey, msg => {
let body = JSON.parse(msg.body);
const newDownloads = myList?.linkedDownloads.map(download => {
if (download.id === body.id) {
updateProgress(body);
return { ...download, ...body };
} else {
return download;
}
});
/* unsubscribe on finish */
if (body.status === 'COMPLETED') {
dispatch(updateMyList({ linkedDownloads: newDownloads }));
wsSubscriptions[groupId].unsubscribe();
delete wsSubscriptions[groupId];
}
});
if (hasProcessing) {
dispatch(fetchActivity(groupId));
}
}
}
});
client.activate();
}

return () => {
Object.keys(wsSubscriptions).forEach(subscriptionKey => {
if (wsSubscriptions[subscriptionKey]) wsSubscriptions[subscriptionKey].unsubscribe();
delete wsSubscriptions[subscriptionKey];
});
client.deactivate();
};
}, [groupId?.linkedDownloads?.length, globalReducer.dimStore]);

Arnaud Cogoluègnes

unread,
Jul 10, 2023, 9:35:44 AM7/10/23
to rabbitmq-users
The producer uses AMQP and the consumer uses (I think) MQTT over web sockets. Please share more information on the JavaScript library you are using to consume messages.

When creating a consumer you should have a look at the created topology (binding and queue) and see if it matches what you expect.

The mapping between MQTT (if you are using it) and AMQP is not obvious, you should have a look at the documentation [1] [2] and experiment with simple code.

Message has been deleted

Arnaud Cogoluègnes

unread,
Jul 11, 2023, 4:08:38 AM7/11/23
to rabbitmq-users
My bad, so the consumer uses STOMP. You should then read the documentation on the way RabbitMQ deals with STOMP topics [1]. Note STOMP uses AMQP 0.9.1 topic exchanges [2]. Wildcards are supported in the bindings for e.g. fanout scenarios but they are not supported in the subscription destination name. Again, you should check the topology created by your applications e.g. on the management UI.
Le lundi 10 juillet 2023 à 16:44:35 UTC+2, Jake a écrit :
I've never heard of MQTT before. I will check that out.

The JavaScript library I am using to consume messages is stompjs. 
import { Client } from '@stomp/stompjs';
I then instantiate the stomp client in the code I posted. 
Reply all
Reply to author
Forward
0 new messages