Guidance on interacting with IModel within EventingBasicConsumer.Received event handler

160 views
Skip to first unread message

Nathan Alden, Sr.

unread,
Feb 22, 2017, 3:01:11 PM2/22/17
to rabbitmq-users
I found this GitHub issue that references the exact question I have: https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/145. However, it does not seem like the question was answered in a concise way. Allow me to ask the question again.

I wrote a couple of classes to help manage eventing consumption, stripped down for the purposes of this post:

public enum ConsumptionResult
{
    Ack,
    NackWithRequeue,
    NackWithoutRequeue,
    RejectWithRequeue,
    RejectWithoutRequeue
}

public abstract class Consumer
{
    private readonly IConnection _connection;

    protected Consumer(IConnection connection)
    {
        _connection = connection;
    }

    protected IModel Channel { get; private set; }

    public void StartConsuming()
    {
        if (Channel != null)
        {
            return;
        }

        Channel = _connection.CreateModel();

        OnStartConsuming();
    }

    public void StopConsuming()
    {
        if (Channel == null)
        {
            return;
        }

        OnStopConsuming();

        Channel?.Dispose();
        Channel = null;
    }

    protected abstract void OnStartConsuming();
    protected abstract void OnStopConsuming();
}

public class JsonConsumer<TMessage> : Consumer
    where TMessage : class
{
    private readonly ConsumptionDelegate<TMessage> _consumptionDelegate;
    private readonly object _lockObject = new object();
    private readonly string _queueName;
    private readonly JSchema _schema;

    private readonly JsonSerializerSettings _serializerSettings =
        new JsonSerializerSettings
        {
            ContractResolver = new CamelCasePropertyNamesContractResolver(),
            Converters = { new StringEnumConverter(true) }
        };

    private readonly ConsumptionResult _unhandledExceptionResult;
    private readonly ConsumptionResult _validationErrorResult;
    private EventingBasicConsumer _consumer;

    public JsonConsumer(
        IConnection connection,
        string queueName,
        JSchema schema,
        ConsumptionDelegate<TMessage> consumptionDelegate,
        ConsumptionResult validationErrorResult = ConsumptionResult.RejectWithoutRequeue,
        ConsumptionResult unhandledExceptionResult = ConsumptionResult.RejectWithoutRequeue)
        : base(connection)
    {
        _validationErrorResult = validationErrorResult;
        _unhandledExceptionResult = unhandledExceptionResult;
        _queueName = queueName;
        _schema = schema;
        _consumptionDelegate = consumptionDelegate;
    }

    protected override void OnStartConsuming()
    {
        Channel.QueueDeclare(_queueName, true, false, false);

        _consumer = new EventingBasicConsumer(Channel);
        _consumer.Received += MessageReceived;

        Channel.BasicConsume(_queueName, false, _consumer);
    }

    protected override void OnStopConsuming()
    {
        _consumer.Received -= MessageReceived;
        _consumer = null;
    }

    private async void MessageReceived(object sender, BasicDeliverEventArgs args)
    {
        IModel channel = ((EventingBasicConsumer)sender).Model;

        try
        {
            string json = Encoding.UTF8.GetString(args.Body);
            TMessage message;

            using (var stringReader = new StringReader(json))
            using (var textReader = new JsonTextReader(stringReader))
            using (var validatingReader = new JSchemaValidatingReader(textReader) { Schema = _schema })
            {
                var validationErrors = new List<SchemaValidationEventArgs>();

                validatingReader.ValidationEventHandler += (o, e) => { validationErrors.Add(e); };

                JsonSerializer serializer = JsonSerializer.Create(_serializerSettings);

                message = serializer.Deserialize<TMessage>(validatingReader);

                if (validationErrors.Any())
                {
                    ProcessResult(channel, _validationErrorResult, args);
                    return;
                }
            }

            ConsumptionResult result = await _consumptionDelegate(message, args);

            ProcessResult(channel, result, args);
        }
        catch
        {
            ProcessResult(channel, _unhandledExceptionResult, args);
        }
    }

    private void ProcessResult(IModel channel, ConsumptionResult result, BasicDeliverEventArgs args)
    {
        // Channels are not thread-safe
        lock (_lockObject)
        {
            switch (result)
            {
                case ConsumptionResult.Ack:
                    channel.BasicAck(args.DeliveryTag, false);
                    break;
                case ConsumptionResult.NackWithRequeue:
                    channel.BasicNack(args.DeliveryTag, false, true);
                    break;
                case ConsumptionResult.NackWithoutRequeue:
                    channel.BasicNack(args.DeliveryTag, false, false);
                    break;
                case ConsumptionResult.RejectWithRequeue:
                    channel.BasicReject(args.DeliveryTag, true);
                    break;
                case ConsumptionResult.RejectWithoutRequeue:
                    channel.BasicReject(args.DeliveryTag, false);
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }
    }
}

Do I need the lock statement in the ProcessResult method? I assumed that the EventingBasicConsumer.Received handlers can be called on any thread and I know from my research that IModel is not thread-safe. I want to avoid a situation where multiple messages are being consumed and they are all trying to access the same underlying IModel instance.

Nathan Alden, Sr.

unread,
Feb 22, 2017, 3:41:41 PM2/22/17
to rabbitmq-users
https://www.rabbitmq.com/dotnet-api-guide.html seems to indicate that yes, I do need the lock. Some information about how the Received event is invoked from a threading standpoint would be appreciated.

Michael Klishin

unread,
Feb 22, 2017, 4:06:56 PM2/22/17
to rabbitm...@googlegroups.com
Our docs are more conservative that they could be because providing various examples can be really
laboursome.

Consumer operations are dispatched in a custom thread pool that guarantees per-channel ordering. There isn't
anything particularly clever.

Sharing a channel for publishing is definitely not supported and will result in incorrect frame interleaving
on the wire because even publishing a 1 byte message actually translates into 3 frames:
[basic publish][content header][content body] 

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Nathan Alden, Sr.

unread,
Feb 23, 2017, 9:19:52 AM2/23/17
to rabbitmq-users
Thanks for the reply. I did see all the guidance around publishers, which is why my publisher class is completely self-contained and explicitly marked as not thread-safe. It seems like I have the right idea for the lock in my event handler, then; it should mitigate any threading strategy you folks use internally.
Reply all
Reply to author
Forward
0 new messages