Hello!
I have been stuck on this one for a few weeks now and would appreciate any guidance from the community.
I have a RabbitMQ Producer using Quarkus(Java Framework) that sends messages through an outgoing channel and also sends a routing key in the metadata of the messages. I am using a virtual-host as well. I've been referencing the
Quarkus RabbitMQ usage documentation.
I have a React consumer that subscribes to routingKeys and expects them in a format using global markers. The routing key seems to be different but I got it to work using a different Producer using the same Producer routingKey. This consumer should remained untouched as it works with a previous consumer that I am replacing.
The Producer sends messages to an outgoing channel I have called "reports" for this example. The Consumer connects to RabbitMQ with a url to the broker and with a virtual host. I have the virtual host configuration set up in my Producer and the broker url is defined in a layer between the consumer and producer.
Right now the behavior looks as follows:
The PRODUCER appears to successfully send outgoing messages and does not give any connection errors. However the consumer does not pick up the messages and does not print out any logs when I add them in.
Does anyone know what could be going wrong or could give any ideas to troubleshoot?
Thank you.
Below is the code.
PRODUCER:routingKey: "dim.*.groups.*.reports.*" where the * represents a unique ID.
PRODUCER CODE:
@Inject
@Channel("reports")
Emitter<Report> emitter;
private OutgoingRabbitMQMetadata buildMetadata(String routingKey) {
return new OutgoingRabbitMQMetadata.Builder()
.withRoutingKey(routingKey)
.withTimestamp(ZonedDateTime.now())
.build();
}
public void sendMessage(Report report, String routingKey) {
OutgoingRabbitMQMetadata metadata = buildMetadata(routingKey);
emitter.send(Message.of(export, Metadata.of(metadata)));
}
PRODUCER APP PROPERTIES:(following the documentation)
rabbitmq-host=my-host.mydomain.com
rabbitmq-port=5672
rabbitmq-username=username
rabbitmq-password=password
rabbitmq-virtual-host=myhost
mp.messaging.outgoing.reports.connector=smallrye-rabbitmqCONSUMER:routingKey: "/topic/dim.*.groups.*.reports.*"
CONSUMER CODE:
const client = new Client();
useEffect(() => {
const wsSubscriptions = {};
const hasProcessing = myList?.linkedDownloads?.filter(dl => dl.status === 'PROCESSING')?.length;
if (hasProcessing) {
client.configure({
brokerURL: globalReducer.dimStore.socketUrl,
connectHeaders: {
login: 'username',
passcode: 'passcode',
host: globalReducer.dimStore.socketVHost
},
heartbeatOutgoing: 10000,
heartbeatIncoming: 10000,
reconnectDelay: 500,
onConnect: () => {
if (myList?.linkedDownloads && client) {
const routingKey = `/topic/dim.*.groups.${groupId}.reports.*`;
wsSubscriptions[groupId] = client.subscribe(routingKey, msg => {
let body = JSON.parse(msg.body);
const newDownloads = myList?.linkedDownloads.map(download => {
if (download.id === body.id) {
updateProgress(body);
return { ...download, ...body };
} else {
return download;
}
});
/* unsubscribe on finish */
if (body.status === 'COMPLETED') {
dispatch(updateMyList({ linkedDownloads: newDownloads }));
wsSubscriptions[groupId].unsubscribe();
delete wsSubscriptions[groupId];
}
});
if (hasProcessing) {
dispatch(fetchActivity(groupId));
}
}
}
});
client.activate();
}
return () => {
Object.keys(wsSubscriptions).forEach(subscriptionKey => {
if (wsSubscriptions[subscriptionKey]) wsSubscriptions[subscriptionKey].unsubscribe();
delete wsSubscriptions[subscriptionKey];
});
client.deactivate();
};
}, [groupId?.linkedDownloads?.length, globalReducer.dimStore]);