public enum ConsumptionResult
{
Ack,
NackWithRequeue,
NackWithoutRequeue,
RejectWithRequeue,
RejectWithoutRequeue
}
public abstract class Consumer
{
private readonly IConnection _connection;
protected Consumer(IConnection connection)
{
_connection = connection;
}
protected IModel Channel { get; private set; }
public void StartConsuming()
{
if (Channel != null)
{
return;
}
Channel = _connection.CreateModel();
OnStartConsuming();
}
public void StopConsuming()
{
if (Channel == null)
{
return;
}
OnStopConsuming();
Channel?.Dispose();
Channel = null;
}
protected abstract void OnStartConsuming();
protected abstract void OnStopConsuming();
}
public class JsonConsumer<TMessage> : Consumer
where TMessage : class
{
private readonly ConsumptionDelegate<TMessage> _consumptionDelegate;
private readonly object _lockObject = new object();
private readonly string _queueName;
private readonly JSchema _schema;
private readonly JsonSerializerSettings _serializerSettings =
new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
Converters = { new StringEnumConverter(true) }
};
private readonly ConsumptionResult _unhandledExceptionResult;
private readonly ConsumptionResult _validationErrorResult;
private EventingBasicConsumer _consumer;
public JsonConsumer(
IConnection connection,
string queueName,
JSchema schema,
ConsumptionDelegate<TMessage> consumptionDelegate,
ConsumptionResult validationErrorResult = ConsumptionResult.RejectWithoutRequeue,
ConsumptionResult unhandledExceptionResult = ConsumptionResult.RejectWithoutRequeue)
: base(connection)
{
_validationErrorResult = validationErrorResult;
_unhandledExceptionResult = unhandledExceptionResult;
_queueName = queueName;
_schema = schema;
_consumptionDelegate = consumptionDelegate;
}
protected override void OnStartConsuming()
{
Channel.QueueDeclare(_queueName, true, false, false);
_consumer = new EventingBasicConsumer(Channel);
_consumer.Received += MessageReceived;
Channel.BasicConsume(_queueName, false, _consumer);
}
protected override void OnStopConsuming()
{
_consumer.Received -= MessageReceived;
_consumer = null;
}
private async void MessageReceived(object sender, BasicDeliverEventArgs args)
{
IModel channel = ((EventingBasicConsumer)sender).Model;
try
{
string json = Encoding.UTF8.GetString(args.Body);
TMessage message;
using (var stringReader = new StringReader(json))
using (var textReader = new JsonTextReader(stringReader))
using (var validatingReader = new JSchemaValidatingReader(textReader) { Schema = _schema })
{
var validationErrors = new List<SchemaValidationEventArgs>();
validatingReader.ValidationEventHandler += (o, e) => { validationErrors.Add(e); };
JsonSerializer serializer = JsonSerializer.Create(_serializerSettings);
message = serializer.Deserialize<TMessage>(validatingReader);
if (validationErrors.Any())
{
ProcessResult(channel, _validationErrorResult, args);
return;
}
}
ConsumptionResult result = await _consumptionDelegate(message, args);
ProcessResult(channel, result, args);
}
catch
{
ProcessResult(channel, _unhandledExceptionResult, args);
}
}
private void ProcessResult(IModel channel, ConsumptionResult result, BasicDeliverEventArgs args)
{
// Channels are not thread-safe
lock (_lockObject)
{
switch (result)
{
case ConsumptionResult.Ack:
channel.BasicAck(args.DeliveryTag, false);
break;
case ConsumptionResult.NackWithRequeue:
channel.BasicNack(args.DeliveryTag, false, true);
break;
case ConsumptionResult.NackWithoutRequeue:
channel.BasicNack(args.DeliveryTag, false, false);
break;
case ConsumptionResult.RejectWithRequeue:
channel.BasicReject(args.DeliveryTag, true);
break;
case ConsumptionResult.RejectWithoutRequeue:
channel.BasicReject(args.DeliveryTag, false);
break;
default:
throw new ArgumentOutOfRangeException();
}
}
}
}