Delay in creating channel and queue

75 views
Skip to first unread message

Sateesh Kumar

unread,
Sep 14, 2023, 2:52:10 AM9/14/23
to rabbitmq-users
Hello,
before i report the error let me explain the use case:

Use case
We have .net api which receives request and pushes it to rabbit mq for our 2nd application (engine) to consume. That 2nd app (engine) gets this request/message and publishes its respective response back to rmq. the response queue is already set while pushing from .net API so that engine knows where to publish the response.

Code Design:
single rmq connection (singlton) for .net API's all request
channel created and closed for every request in push and get method ( see below)

Extra pointer
different applications use the same rmq as well.

Versions:
Rmq 3.11.20 erlang 25.2.1
.net rmq client 3.6.0

Problem
So, when there is enormus amount of customer traffic on our applications, channels are created slowly and their queues as well. My API code and logs will show and prove that.



Traffic:
6k+ connections can be from different applications and the same goes for publishes and channels to which are 200 messages per sec and 4000 consumers.

I have attached logs for rabbit mq and .net API where you can see sometimes it takes time. ( see delay in rmq screenshot)

API code for rmq communication

This method pushes data
public bool pushMessageToRBMQ(MessageObject jObj, string requestId, string requestQueue = "", string id = "", string callingController = "")
{
    log.Info($"{callingController} {id} pushMessageToRBMQ");

    bool res = false;
    var isConnected = IsRMQConnected(id, callingController);

    if (!isConnected)
    {
        log.Error($"{callingController} {id} connection could not be established while pushing msg");
    }
    else
    {
        IModel channel = connection.CreateModel();

        try
        {

            //Properties for message in request queue
            Dictionary<String, Object> args = new Dictionary<String, Object>();
            args.Add("x-message-ttl", messageExpiry);

            if (channel != null)
                log.Info($"{callingController} {id} channel created before pushing");
            else
                log.Error($"{callingController} {id} channel could not be created before pushing");

            //Declaring and creating queue , if already exists resume operation
            var queueInfo = channel.QueueDeclare(queue: requestQueue,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: args);

            if( queueInfo != null )
            {
                if (string.IsNullOrEmpty(queueInfo.QueueName))
                {
                    log.Info($"{callingController} {id} request queue could not be created");
                }
                else
                {
                    log.Info($"{callingController} {id} request queue created: {queueInfo.QueueName}");
                }
            }
            else
            {
                log.Info($"{callingController} {id} request queue(null) could not  be created");
            }

            String message = ((JObject)jObj.getContent()).ToString(Newtonsoft.Json.Formatting.None, null);
            String messageType = ((JValue)jObj.getType()).ToString(Newtonsoft.Json.Formatting.None, null);
            var body = Encoding.UTF8.GetBytes(message.ToString());

            String corrId = Guid.NewGuid().ToString();
            IBasicProperties props = channel.CreateBasicProperties();

            //Appending request data with certain properties in order to communicate with Engine
            props.CorrelationId = corrId;
            props.DeliveryMode = 2;
            //props.Persistent = false;
            jObj.setId(requestId);
            props.MessageId = jObj.getId().ToString();
            props.ReplyTo = jObj.getId().ToString();
            props.Type = jObj.getType();

            channel.ContinuationTimeout = TimeSpan.FromSeconds(rmqPushMsgTimeout);

            channel.BasicPublish(this._requestExchange, requestQueue, props, body);
            log.Info($"{callingController} {id} message published to exchange");

            channel.ConfirmSelect(); // It will raise exception if channel not active.

            jObj.setTimeAsCurrentTime();
            jObj.replyHandler = null;

            //Publish the message data to request Queue
            log.Info($"{callingController} {id} Data Published: " + message + " : " + messageType);

            res = true;
        }
        catch (Exception ex)
        {
            log.Error($"{callingController} {id} Error while pushing to rmq: {ex.Message}  " + ex);
            res = false;
        }
        int retryCount = channelCloseRetryCount;

        while (retryCount > 0 & channel != null)
        {
            try
            {

                if (channel != null)
                {
                    channel.Close();
                    log.Info($"{callingController} {id} channel closed");
                    break;
                }
                else
                    log.Info($"{callingController} {id} channel already closed");
            }
            catch (Exception err)
            {
                log.Error($"{callingController} {id} {requestId} error while closing channel: {err.Message}  " + err);
                retryCount--;
                Thread.Sleep(channelCloseRetryTime);
            }

        }
    }
    return res;
}

This method gets the response from engine
public responseJson getMessagesFromRBMQ(string requestId, string id = "", string callingController = "")
{
    log.Info($"{callingController} {id} getMessagesFromRBMQ");

    var isConnected = IsRMQConnected(id, callingController);

    responseJson res = new responseJson();

    if (!isConnected)
    {
        log.Error($"{callingController} {id} connection could not be established while getting msg");
        res.message = "Connection failed";
        res.status = "Failure";
        res.data = null;
        return res;
    }

    var jsonObj = "";
   
    bool continueResponseCheck = true;

    double diffInSeconds = 0.0;

    DateTime dtFirstJsonFetchTime = DateTime.Now;
    EventingBasicConsumer consumer = null;
    IModel channel = null;
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        jsonObj = Encoding.UTF8.GetString(body);
        log.Info($"{callingController} {id} Data Received: " + jsonObj.ToString());
    };

    while (continueResponseCheck)
    {
        try
        {
            diffInSeconds = (DateTime.Now - dtFirstJsonFetchTime).TotalSeconds;

            if (jsonObj == null || jsonObj == "" || jsonObj == "{}")
            {
                // wait for completing 10 seconds.
                if (diffInSeconds > requestTimeOut)
                {
                    if (jsonObj == "{}")
                    {
                        log.Error($"{callingController} {id} Error while getting msg: " + "Something went wrong");
                        continueResponseCheck = false;
                        res.message = "Something went wrong";
                        res.status = "Failure";
                        res.data = null;
                    }
                    else
                    {
                        log.Error($"{callingController} {id} Error while getting msg: " + "Connection timeout(hardcoded)");
                        continueResponseCheck = false;
                        res.message = "Connection timeout";
                        res.status = "Failure";
                        res.data = null;
                    }
                }
                else
                {
                    continueResponseCheck = true;
                }
            }
            else
            {

               
                continueResponseCheck = false;
               
                var JSONwithErr = JObject.Parse(jsonObj);

                if (JSONwithErr.ContainsKey("error"))
                {
                    if (JSONwithErr["error"].ToString().ToLower().Equals("true"))
                    {
                        res.message = (string)JSONwithErr["message"];
                        res.status = "Failure";
                        res.data = null;
                        log.Error($"{callingController} {id} Error while getting msg (rmq has error):" + res.message);
                    }
                    else
                    {
                        log.Info($"{callingController} {id} Data sent successfully");
                        res.message = "Data sent successfully";
                        res.status = "Success";
                        res.data = JSONwithErr;
                    }
                }
                else
                {
                    log.Info($"{callingController} {id} Data sent successfully");
                    res.message = "Data sent successfully";
                    res.status = "Success";
                    res.data = JSONwithErr;
                }
            }
        }
        catch (Exception err)
        {
            // wait for completing 10 seconds.
            if (diffInSeconds > requestTimeOut)
            {
                log.Error($"{callingController} {id} Error while getting msg(catch): {err.Message} " + err);
                continueResponseCheck = false;
                res.message = err.Message;
                res.status = "Failure";
                res.data = null;
            }
            else
            {
                continueResponseCheck = true;
            }
        }
        Thread.Sleep(5);
    }
   
    log.Info($"{callingController} {id} outside loop");

    try
    {
        var messageCount = channel.QueueDelete(requestId, false, false);
        log.Info($"{callingController} {id} {requestId} deleted with {messageCount} messages.");
    }
    catch (Exception err)
    {
        log.Error($"{callingController} {id} {requestId} error while deleting response queue: {err.Message}  " + err);
    }


    //int retryCount = channelCloseRetryCount;

    //while (retryCount > 0 & channel != null)
    //{
    //    try
    //    {

    //        if (consumer != null && consumer.ConsumerTag != null)
    //        {
    //            channel.BasicCancel(consumer.ConsumerTag);

    //            log.Info($"{callingController} {id} consumer closed");
    //        }
    //        else
    //            log.Info($"{callingController} {id} consumer is null");

    //        if (channel != null)
    //        {
    //            channel.Close();
    //            log.Info($"{callingController} {id} response channel closed");
    //        }
    //        else
    //            log.Info($"{callingController} {id} response channel already closed");
    //    }
    //    catch (Exception err)
    //    {
    //        log.Error($"{callingController} {id} {requestId} error while closing channel: {err.Message}  " + err);
    //        retryCount--;
    //        Thread.Sleep(channelCloseRetryTime);
    //    }
    //}
    return res;
}
delay in rmq.png

Michal Kuratczyk

unread,
Sep 14, 2023, 3:50:26 AM9/14/23
to rabbitm...@googlegroups.com
It sounds like you're unnecessarily overloading RabbitMQ. Have a look at:

Best,

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/rabbitmq-users/48f19cc4-a804-4438-a59d-a9d66a08bc0en%40googlegroups.com.


--
Michał
RabbitMQ team
Reply all
Reply to author
Forward
0 new messages