const amqp = require("amqplib/callback_api");
let channel = null;
const connect = () => {
amqp.connect("amqp://localhost", function (error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function (error1, ch) {
if (error1) {
throw error1;
}
// Declare the exchange
const exchange = "nlp_preprocessing_exchange";
ch.assertExchange(exchange, "direct", { durable: true });
// Declare the request queue
const requestQueue = "nlp_preprocessing_request_queue";
ch.assertQueue(requestQueue, { durable: true });
// Bind the request queue to the exchange with the routing key 'request'
ch.bindQueue(requestQueue, exchange, "request");
// Declare the result queue
const resultQueue = "nlp_preprocessing_result_queue";
ch.assertQueue(resultQueue, { durable: true });
// Bind the result queue to the exchange with the routing key 'result'
ch.bindQueue(resultQueue, exchange, "result");
channel = ch;
console.log("RabbitMQ connection established successfully");
});
});
};
function getChannel() {
return channel;
}
module.exports = { getChannel, connect };
then when a request is received on a route say /test I send the
request to the test_request queue using channel.sendToQueue()
in my test.services.js.
Right now I am creating a promise in my test.services.js
where I start consuming from the channel and in the
channel.consume() callback
function I resolve the promise whenever it consumes appropriate
response message from the result_queue (identified by userID).
example of test.services.js is as follows
const { getChannel } = require("../utils/Rabbitmq");
const testService = async (userID, fileName, stepName, arguments) => {
// Get RabbitMQ channel
const channel = getChannel();
// Publish message to request_queue
const requestQueue = "test_request_queue";
const resultQueue = "test_result_queue";
const message = {
userID,
fileName,
stepName,
arguments,
};
channel.sendToQueue(requestQueue, Buffer.from(JSON.stringify(message)), {
persistent: true,
});
// Create a promise to handle the response from the Python script
const responsePromise = new Promise((resolve, reject) => {
// Create a listener to handle the response from the Python script
channel.consume(
resultQueue,
(msg) => {
// Parse the response
const response = JSON.parse(msg.content.toString());
// If the response is for the current request, resolve the promise
if (response.userID === userID) {
// Send ACK to RabbitMQ
channel.ack(msg);
channel.cancel(userID);
// Resolve the promise
resolve(response);
console.log("Response in consumer: ", response);
}
},
{ noAck: false, consumerTag: userID }
);
});
// add try catch
try {
// Wait for the response from the Python script
const response = await responsePromise;
return response;
}
catch (error) {
console.log("Error: ", error);
// throw error;
throw new Error("Error: " + error);
}
};
module.exports = { testService };
What I want to know is that how can I create a central consumer callback function in my utils script
(which establishes connection with rabbitmq and defines the exchange & queues) which will supply the response message consumed from the result_queue to my various routes so that the route can return the response consumed from the result_queue to it's appropriate client (say by identifying by a unique requestID)