Sateesh Kumar
unread,Sep 14, 2023, 2:52:10 AM9/14/23Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to rabbitmq-users
Hello,
before i report the error let me explain the use case:
Use case
We have .net api which receives request and pushes it to rabbit mq for our 2nd application (engine) to consume. That 2nd app (engine) gets this request/message and publishes its respective response back to rmq. the response queue is already set while pushing from .net API so that engine knows where to publish the response.
Code Design:
single rmq connection (singlton) for .net API's all request
channel created and closed for every request in push and get method ( see below)
Extra pointer
different applications use the same rmq as well.
Versions:
Rmq 3.11.20 erlang 25.2.1
.net rmq client 3.6.0
Problem
So, when there is enormus amount of customer traffic on our applications, channels are created slowly and their queues as well. My API code and logs will show and prove that.
Traffic:
6k+ connections can be from different applications and the same goes for publishes and channels to which are 200 messages per sec and 4000 consumers.
I have attached logs for rabbit mq and .net API where you can see sometimes it takes time. ( see delay in rmq screenshot)
API code for rmq communication
This method pushes data
public bool pushMessageToRBMQ(MessageObject jObj, string requestId, string requestQueue = "", string id = "", string callingController = "")
{
log.Info($"{callingController} {id} pushMessageToRBMQ");
bool res = false;
var isConnected = IsRMQConnected(id, callingController);
if (!isConnected)
{
log.Error($"{callingController} {id} connection could not be established while pushing msg");
}
else
{
IModel channel = connection.CreateModel();
try
{
//Properties for message in request queue
Dictionary<String, Object> args = new Dictionary<String, Object>();
args.Add("x-message-ttl", messageExpiry);
if (channel != null)
log.Info($"{callingController} {id} channel created before pushing");
else
log.Error($"{callingController} {id} channel could not be created before pushing");
//Declaring and creating queue , if already exists resume operation
var queueInfo = channel.QueueDeclare(queue: requestQueue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: args);
if( queueInfo != null )
{
if (string.IsNullOrEmpty(queueInfo.QueueName))
{
log.Info($"{callingController} {id} request queue could not be created");
}
else
{
log.Info($"{callingController} {id} request queue created: {queueInfo.QueueName}");
}
}
else
{
log.Info($"{callingController} {id} request queue(null) could not be created");
}
String message = ((JObject)jObj.getContent()).ToString(Newtonsoft.Json.Formatting.None, null);
String messageType = ((JValue)jObj.getType()).ToString(Newtonsoft.Json.Formatting.None, null);
var body = Encoding.UTF8.GetBytes(message.ToString());
String corrId = Guid.NewGuid().ToString();
IBasicProperties props = channel.CreateBasicProperties();
//Appending request data with certain properties in order to communicate with Engine
props.CorrelationId = corrId;
props.DeliveryMode = 2;
//props.Persistent = false;
jObj.setId(requestId);
props.MessageId = jObj.getId().ToString();
props.ReplyTo = jObj.getId().ToString();
props.Type = jObj.getType();
channel.ContinuationTimeout = TimeSpan.FromSeconds(rmqPushMsgTimeout);
channel.BasicPublish(this._requestExchange, requestQueue, props, body);
log.Info($"{callingController} {id} message published to exchange");
channel.ConfirmSelect(); // It will raise exception if channel not active.
jObj.setTimeAsCurrentTime();
jObj.replyHandler = null;
//Publish the message data to request Queue
log.Info($"{callingController} {id} Data Published: " + message + " : " + messageType);
res = true;
}
catch (Exception ex)
{
log.Error($"{callingController} {id} Error while pushing to rmq: {ex.Message} " + ex);
res = false;
}
int retryCount = channelCloseRetryCount;
while (retryCount > 0 & channel != null)
{
try
{
if (channel != null)
{
channel.Close();
log.Info($"{callingController} {id} channel closed");
break;
}
else
log.Info($"{callingController} {id} channel already closed");
}
catch (Exception err)
{
log.Error($"{callingController} {id} {requestId} error while closing channel: {err.Message} " + err);
retryCount--;
Thread.Sleep(channelCloseRetryTime);
}
}
}
return res;
}
This method gets the response from engine
public responseJson getMessagesFromRBMQ(string requestId, string id = "", string callingController = "")
{
log.Info($"{callingController} {id} getMessagesFromRBMQ");
var isConnected = IsRMQConnected(id, callingController);
responseJson res = new responseJson();
if (!isConnected)
{
log.Error($"{callingController} {id} connection could not be established while getting msg");
res.message = "Connection failed";
res.status = "Failure";
res.data = null;
return res;
}
var jsonObj = "";
bool continueResponseCheck = true;
double diffInSeconds = 0.0;
DateTime dtFirstJsonFetchTime = DateTime.Now;
EventingBasicConsumer consumer = null;
IModel channel = null;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
jsonObj = Encoding.UTF8.GetString(body);
log.Info($"{callingController} {id} Data Received: " + jsonObj.ToString());
};
while (continueResponseCheck)
{
try
{
diffInSeconds = (DateTime.Now - dtFirstJsonFetchTime).TotalSeconds;
if (jsonObj == null || jsonObj == "" || jsonObj == "{}")
{
// wait for completing 10 seconds.
if (diffInSeconds > requestTimeOut)
{
if (jsonObj == "{}")
{
log.Error($"{callingController} {id} Error while getting msg: " + "Something went wrong");
continueResponseCheck = false;
res.message = "Something went wrong";
res.status = "Failure";
res.data = null;
}
else
{
log.Error($"{callingController} {id} Error while getting msg: " + "Connection timeout(hardcoded)");
continueResponseCheck = false;
res.message = "Connection timeout";
res.status = "Failure";
res.data = null;
}
}
else
{
continueResponseCheck = true;
}
}
else
{
continueResponseCheck = false;
var JSONwithErr = JObject.Parse(jsonObj);
if (JSONwithErr.ContainsKey("error"))
{
if (JSONwithErr["error"].ToString().ToLower().Equals("true"))
{
res.message = (string)JSONwithErr["message"];
res.status = "Failure";
res.data = null;
log.Error($"{callingController} {id} Error while getting msg (rmq has error):" + res.message);
}
else
{
log.Info($"{callingController} {id} Data sent successfully");
res.message = "Data sent successfully";
res.status = "Success";
res.data = JSONwithErr;
}
}
else
{
log.Info($"{callingController} {id} Data sent successfully");
res.message = "Data sent successfully";
res.status = "Success";
res.data = JSONwithErr;
}
}
}
catch (Exception err)
{
// wait for completing 10 seconds.
if (diffInSeconds > requestTimeOut)
{
log.Error($"{callingController} {id} Error while getting msg(catch): {err.Message} " + err);
continueResponseCheck = false;
res.message = err.Message;
res.status = "Failure";
res.data = null;
}
else
{
continueResponseCheck = true;
}
}
Thread.Sleep(5);
}
log.Info($"{callingController} {id} outside loop");
try
{
var messageCount = channel.QueueDelete(requestId, false, false);
log.Info($"{callingController} {id} {requestId} deleted with {messageCount} messages.");
}
catch (Exception err)
{
log.Error($"{callingController} {id} {requestId} error while deleting response queue: {err.Message} " + err);
}
//int retryCount = channelCloseRetryCount;
//while (retryCount > 0 & channel != null)
//{
// try
// {
// if (consumer != null && consumer.ConsumerTag != null)
// {
// channel.BasicCancel(consumer.ConsumerTag);
// log.Info($"{callingController} {id} consumer closed");
// }
// else
// log.Info($"{callingController} {id} consumer is null");
// if (channel != null)
// {
// channel.Close();
// log.Info($"{callingController} {id} response channel closed");
// }
// else
// log.Info($"{callingController} {id} response channel already closed");
// }
// catch (Exception err)
// {
// log.Error($"{callingController} {id} {requestId} error while closing channel: {err.Message} " + err);
// retryCount--;
// Thread.Sleep(channelCloseRetryTime);
// }
//}
return res;
}