Pipelining of requests forbidden

821 views
Skip to first unread message

Vnoth Vijaya

unread,
Sep 21, 2022, 3:21:43 AM9/21/22
to rabbitmq-users
Hi all,

Thanks.
I am new to RabbitMQ . using RabbitMQ.Client  .NET

Some case i found  this error " Pipelining of requests forbidden", While consuming message from queue 

Code below:
 _channel.QueueDeclare(queue, true, false, false);
var count = _channel.MessageCount(queue);
                if (count > 0)
                {
                    var queueresult = _channel.BasicGet(queue, true);
                    var jsonSpecified = Encoding.UTF8.GetString(queueresult.Body.Span);
                    var result = JsonConvert.DeserializeObject<ImageProcessPreVector>(jsonSpecified);
                   
                    return result;
                }

Exception below:
    
Pipelining of requests forbidden   at RabbitMQ.Client.Impl.RpcContinuationQueue.Enqueue(IRpcContinuation k)
   at RabbitMQ.Client.Impl.ModelBase.Enqueue(IRpcContinuation k)
   at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)

Could any one provide a solutions to us.
       

Regards,
Vinoth E.

Luke Bakken

unread,
Sep 23, 2022, 11:57:34 AM9/23/22
to rabbitmq-users
Hi Vinoth,

At first glance your code should not trigger that error. In order for us to assist, please provide a complete code example that I can compile and run to see this issue.

Ideally, you would provide a repository (on GitHub or GitLab, for instance) that I can clone and compile.

Thank you,
Luke

Vnoth Vijaya

unread,
Sep 27, 2022, 3:29:38 AM9/27/22
to rabbitmq-users
Great Thanks Luke

Apologies don't have access to upload source, due to company policy.

//Provided sample below.
//Note :Below application is deployed in IIS  WEB API

//===========================================================
//Below method is used to push message in queue. 
//No issue found in this method
   [HttpPost]
        public async Task<IActionResult> Any2PDF([FromBody] Any2Pdf request)
        {
            Log log = new Log();
            try
            {
                string inputJson = JsonConvert.SerializeObject(request);
                await _busControl.SendAsync(_queue.Value.Any2PDFQueue, request);
               return Ok(new ResponseModel() { IsSuccess = true, Message = string.Empty });
            }
            catch (Exception ex)
            {
                log.apiLog("OwnController/Any2PDF " + ex.Message + ex.StackTrace, logPath);
                return Ok(new ResponseModel() { IsSuccess = false, Message = ex.ToString() });
            }
        }
       
        public async Task SendAsync<T>(string queue, T message)
        {
            await Task.Run(() =>
            {
                _channel.QueueDeclare(queue, true, false, false);
                var properties = _channel.CreateBasicProperties();
                properties.Persistent = false; var output = JsonConvert.SerializeObject(message);
                _channel.BasicPublish(string.Empty, queue, null,
                Encoding.UTF8.GetBytes(output));
            });

        }

//========================================================================

//Below method is API GET Method :
//Background service will hit this method continously, every 30 seconds
//if data exists in queue it will return.
//Findings: If no data found in queue for long time , Pipelining of requests forbidden  exception throws


 [HttpGet]
        public async Task<IActionResult> Any2PDF()
        {
            Log log = new Log();
            try
            {
             
                var result = await _busControl.ReceiveAny2PDF(_queue.Value.Any2PDFQueue);
               
                return Ok(result);
            }
            catch (Exception ex)
            {
                log.apiLog("Consumer/Any2PDF " + ex.Message + ex.StackTrace, logPath);
                return Ok(new ResponseModel() { IsSuccess = false, Message = ex.ToString() });
            }
        }

        public async Task<Any2Pdf> ReceiveAny2PDF(string queue)
        {
                        try
            {
                _channel.QueueDeclare(queue, true, false, false); //some cases Pipelining of requests forbidden

                //var consumer = new AsyncEventingBasicConsumer(_channel);
                //
                var count = _channel.MessageCount(queue);
                if (count > 0)
                {
                    var queueresult = _channel.BasicGet(queue, true);
                    var jsonSpecified = Encoding.UTF8.GetString(queueresult.Body.Span);
                    var result = JsonConvert.DeserializeObject<Any2Pdf>(jsonSpecified);
                    return result;
                }
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return new Any2Pdf();
        }
//=======================================================


Regards,
Vinoth E.

Luke Bakken

unread,
Sep 27, 2022, 10:05:52 AM9/27/22
to rabbitmq-users
Hi  Vinoth,

I can't really give many suggestions with an incomplete example such as this.

When you see the pipeline error message it means that multiple threads have used the same connection to send data to RabbitMQ. Channels can be shared across threads, but connections can't.

How are you creating your connection and channel objects?

Since you're calling Any2PDF every 30 seconds to retrieve at most one queue message, it would be appropriate to open a new connection and channel for the lifetime of that method. Normally this practice is avoided as I'm sure you know.

Thanks,
Luke

Vnoth Vijaya

unread,
Sep 28, 2022, 4:04:51 AM9/28/22
to rabbitmq-users
Hi Luke,

Great Thanks,

 I am creating connection in startup class.

 I will change the code of creating connection in lifetime of the method.



Regards,
Vinoth E

Vnoth Vijaya

unread,
Sep 29, 2022, 9:21:13 AM9/29/22
to rabbitmq-users
Hi Luke,

Great Thanks

As discussed i have create channel using life time method,
Is the below method is correct.

   public async Task<T> ReceiveAsyncPriority<T>(string queue, T model)
        {
          IDictionary<string, object> args = new Dictionary<string, object>();
            args.Add("x-max-priority", 10);
            try
            {
                var factory = new ConnectionFactory()
                {
                    HostName = HostName,
                    Port = HostPort,
                    VirtualHost = VirtualHost,
                    UserName = UserName,
                    Password = Password,
                    DispatchConsumersAsync = true
                };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                   channel.QueueDeclare(queue, true, false, false, args);
                    var count = channel.MessageCount(queue);
                    if (count > 0)
                    {
                        var queueresult = channel.BasicGet(queue, true);
                        var jsonSpecified = Encoding.UTF8.GetString(queueresult.Body.Span);
                        var result = JsonConvert.DeserializeObject<T>(jsonSpecified);
                        return result;
                    }
                                   }
                return model;
            }
            catch (Exception ex)
            {
                log.apiLog("Catch Queue_ :" + queue + ex.Message + ex.StackTrace, _logpath);

                return model;
            }
}

Now i am facing below error for some cases"

None of the specified endpoints were reachable   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)

The operation has timed out.   at RabbitMQ.Util.BlockingCell`1.WaitForValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateNonRecoveringModel()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at Kromatix_Api.RabbitQueue.RabbitBus.ReceiveAsyncPriority[T](String queue, T model, String logpath)

   
Kindly suggest to resolve this error. 

Regards,
Vinoth E

Luke Bakken

unread,
Sep 29, 2022, 9:45:20 AM9/29/22
to rabbitmq-users
"None of the specified endpoints were reachable" means that a TCP connection could not be made to the host.

Please provide the InnerException information as well, which will show the cause.

Vnoth Vijaya

unread,
Oct 2, 2022, 10:03:36 AM10/2/22
to rabbitmq-users
Hi Luke,

Great Thanks

Inner exception as follows,
System.TimeoutException: The operation has timed out.

   at RabbitMQ.Util.BlockingCell`1.WaitForValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ConnectionStartOk(IDictionary`2 clientProperties, String mechanism, Byte[] response, String locale)
   at RabbitMQ.Client.Framing.Impl.Connection.StartAndTune()
   at RabbitMQ.Client.Framing.Impl.Connection.Open(Boolean insist)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IFrameHandler fh)

   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)

Regards,
Vinoth E.

Luke Bakken

unread,
Oct 5, 2022, 11:08:03 AM10/5/22
to rabbitmq-users
Here is another user having issues with the .NET client and IIS - https://groups.google.com/g/rabbitmq-users/c/PQefm8CzmzU

Please take the time to provide me with code that I can run. It doesn't have to be the same code you're using, just something simple enough to reproduce this issue. I would also appreciate instructions for how to build and run the code using IIS (which I am getting set up right now).

Thanks,
Luke

On Sunday, October 2, 2022 at 7:03:36 AM UTC-7 vnoth....@gmail.com wrote:
Hi Luke,

Great Thanks

Inner exception as follows,
System.TimeoutException: The operation has timed out.
Reply all
Reply to author
Forward
0 new messages