Timeout with AsyncEventingBasicConsumer when creating new channel or declaring a queue

961 views
Skip to first unread message

Mantas Audickas

unread,
Aug 27, 2018, 6:20:19 AM8/27/18
to rabbitmq-users
Hello,

using .NET client for publishing and subscribing to RabbitMQ server.
We use also additional handling - when error occurs, on some conditions we want to send erroneous message to another queue.
However - it looks like it is not possible to create another model (or declare another queue if channel is reused) because of timeout exception.

The code which reproduces the behavior:

public void StartConsumer(string exchangeName, string queueName, AsyncEventHandler<BasicDeliverEventArgs> onMessageReceived)
{
    var connection = _connection.GetConnection();
    _channel = connection.CreateModel();

    _channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);
    _channel.QueueDeclare(queueName, true, false, _connection.AutoDeleteQueues, null);
    _channel.QueueBind(queueName, exchangeName, "#", null);


    // AsyncEventingBasicConsumer causes timeout when trying to create another Channel on the same Connection... 
    // also if Chanel is reused then QueueDeclare is failing with the timeout.
    // this does not happen with EventingBasicConsumer
    var consumer = new AsyncEventingBasicConsumer(_channel);
    consumer.Received += (sender, args) => OnMessageReceived(connection, _channel, args);
    _channel.BasicConsume(queueName, false, consumer);
}

public async Task OnMessageReceived(IConnection connection, IModel channel, BasicDeliverEventArgs received)
{
    try
    {
        Console.WriteLine("Message received!");
        
        // exception happens here!
        var otherChannel = connection.CreateModel();
        otherChannel.QueueDeclare("test", true, false, false, null);
        channel.BasicAck(received.DeliveryTag, false);
        Console.WriteLine("Message consumed!");
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        channel.BasicNack(received.DeliveryTag, false, true);
        throw;
    }
}

Exception which happens:
System.TimeoutException: The operation has timed out.
   at RabbitMQ.Util.BlockingCell.GetValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body)
   at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateNonRecoveringModel()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at Messaging.Registration`1.OnMessageReceived(IConnection connection, BasicDeliverEventArgs received)
in d:\dev\messaging\sources\Research\Registration.cs:line 71



Also tried to reuse the channel, so that QueueDeclared is called as:
channel.QueueDeclare("test", true, false, false, null);

In this case timeout also happens:
System.TimeoutException: The operation has timed out.
   at RabbitMQ.Util.BlockingCell.GetValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at Messaging.Registration`1.OnMessageReceived(IConnection connection, BasicDeliverEventArgs received) 
in in d:\dev\messaging\sources\Research\Registration.cs:line 71


However - in this case queue is created in server...

On the other side if I use EventingBasicConsumer, all works fine without any issues.

Anything I am doing wrong here? Any hints are appreciated.

Michael Klishin

unread,
Aug 30, 2018, 1:59:37 AM8/30/18
to rabbitm...@googlegroups.com
AsyncEventingBasicConsumer and its internal dispatch machinery is a pretty complex beast and we cannot
immediately suggest what cases the timeout. We also don't guess on this list. Please provide an executable way to reproduce
(e.g. a GitHub repo that demonstrates a self-contained piece of code) , RabbitMQ, .NET and OS versions as well as server logs.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

JJ Banda

unread,
Feb 6, 2020, 11:36:01 AM2/6/20
to rabbitmq-users
I'm running into the same issue and would like to know why this code does not work. Reproduction repo:


Steps:
1. Start the RabbitReceiver project to set up message receive handling
2. Start the RabbitSender project to set up message pushing
Result: RabbitReceiver throws the exception mentioned in OP

Server logs attached.

I tested this in both Windows and Linux environments and the issue is present in both. I have Rabbit running in a docker container. As mentioned in the OP, changing AsyncEventingBasicConsumer to EventingBasicConsumer (and disabling DispatchConsumersAsync) makes everything work.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitm...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
server.log
Reply all
Reply to author
Forward
0 new messages