Central consumer callback for NodeJS server

46 views
Skip to first unread message

Zeeshan Ibrahim

unread,
Mar 26, 2023, 9:48:05 PM3/26/23
to rabbitmq-users
Basically my scenario is that I am using MVC design pattern for my NodeJS server which employs services and controllers for user routes. A client sends a request to a route say /test this will be handled by test.controller.js which will in turn call test.services.js which will handle the business logic.

In my server I have a utils script that establishes connection with rabbitmq using "amqplib" and defines one exchange test and 2 queues test_requests & test_results and in this script I export the channel that defines the exchange. I call this utils script in my server.js file and establish connection with rabbitmq. the utils script is as follows
const amqp = require("amqplib/callback_api");

let channel = null;

const connect = () => {
  amqp.connect("amqp://localhost", function (error0, connection) {
    if (error0) {
      throw error0;
    }

    connection.createChannel(function (error1, ch) {
      if (error1) {
        throw error1;
      }

      // Declare the exchange
      const exchange = "nlp_preprocessing_exchange";
      ch.assertExchange(exchange, "direct", { durable: true });

      // Declare the request queue
      const requestQueue = "nlp_preprocessing_request_queue";
      ch.assertQueue(requestQueue, { durable: true });

      // Bind the request queue to the exchange with the routing key 'request'
      ch.bindQueue(requestQueue, exchange, "request");

      // Declare the result queue
      const resultQueue = "nlp_preprocessing_result_queue";
      ch.assertQueue(resultQueue, { durable: true });

      // Bind the result queue to the exchange with the routing key 'result'
      ch.bindQueue(resultQueue, exchange, "result");

      channel = ch;
      console.log("RabbitMQ connection established successfully");
    });
  });
};

function getChannel() {
  return channel;
}

module.exports = { getChannel, connect };

then when a request is received on a route say /test I send the
request to the test_request queue using channel.sendToQueue()
in my test.services.js.

Right now I am creating a promise in my test.services.js
where I start consuming from the channel and in the
channel.consume() callback
function I resolve the promise whenever it consumes appropriate
response message from the result_queue (identified by userID).
example of test.services.js is as follows


const { getChannel } = require("../utils/Rabbitmq");

const testService = async (userID, fileName, stepName, arguments) => {
  // Get RabbitMQ channel
  const channel = getChannel();

  // Publish message to request_queue
  const requestQueue = "test_request_queue";
  const resultQueue = "test_result_queue";
  const message = {
    userID,
    fileName,
    stepName,
    arguments,
  };
  channel.sendToQueue(requestQueue, Buffer.from(JSON.stringify(message)), {
    persistent: true,
  });

  // Create a promise to handle the response from the Python script
  const responsePromise = new Promise((resolve, reject) => {
    // Create a listener to handle the response from the Python script
    channel.consume(
      resultQueue,
      (msg) => {
        // Parse the response
        const response = JSON.parse(msg.content.toString());
        // If the response is for the current request, resolve the promise
        if (response.userID === userID) {
          // Send ACK to RabbitMQ
          channel.ack(msg);
            channel.cancel(userID);
            // Resolve the promise
            resolve(response);
            console.log("Response in consumer: ", response);
        }
      },
      { noAck: false, consumerTag: userID }
    );
  });

  // add try catch
  try {
  // Wait for the response from the Python script
  const response = await responsePromise;
  return response;
  }
  catch (error) {
    console.log("Error: ", error);
    // throw error;

    throw new Error("Error: " + error);
  }

};

module.exports = { testService };

What I want to know is that how can I create a central consumer callback function in my utils script Rabbitmq.js (which establishes connection with rabbitmq and defines the exchange & queues) which will supply the response message consumed from the result_queue to my various routes so that the route can return the response consumed from the result_queue to it's appropriate client (say by identifying by a unique requestID)

Zeeshan Ibrahim

unread,
Mar 26, 2023, 9:51:36 PM3/26/23
to rabbitmq-users
Sorry about the different queue and exchange names in the 2 above provided scripts assume that both are names test_exchange & request_queue, result_queue
Reply all
Reply to author
Forward
0 new messages