You need to implement a IMessageModule to intercept the errors. In
the Init you hook ITransport.MessageProcessingFailure and, most
likely, ITransport.OnMessageSerializationException. The main trick I
found is the failure method gets called for every failure even if a
retry will happen. If you really only want to catch the final failure
after all retries have been exhausted, you need to handle that
yourself. For what it's worth, here's an example from one of my
projects. The public static NumberOfTries is set during bus
configuration so we know the configured retry count:
using System;
using Rhino.ServiceBus;
using Rhino.ServiceBus.DataStructures;
using Rhino.ServiceBus.Impl;
using Rhino.ServiceBus.Internal;
using Rhino.ServiceBus.MessageModules;
using RICOH.SJDF.ApplicationServices.ServiceBus.Messages;
namespace RICOH.SJDF.ApplicationServices.ServiceBus
{
internal class FailedMessageModule: IMessageModule
{
public static int NumberOfTries;
private readonly Hashtable<string, TryCount> _tryCounts = new
Hashtable<string, TryCount>();
private Messenger _messenger;
public void Init(ITransport transport, IServiceBus bus)
{
_messenger = new Messenger(bus);
transport.MessageProcessingFailure +=
OnMessageProcessingFailure;
transport.MessageProcessingCompleted +=
OnMessageProcessingCompleted;
transport.MessageSerializationException +=
OnMessageSerializationException;
}
public void Stop(ITransport transport, IServiceBus bus)
{
}
private void
OnMessageProcessingFailure(CurrentMessageInformation messageInfo,
Exception exception)
{
TryCount tries = null;
_tryCounts.Write(writer =>
{
if (writer.TryGetValue(messageInfo.TransportMessageId,
out tries) == false)
{
tries = new TryCount { Count = 0 };
writer.Add(messageInfo.TransportMessageId, tries);
}
tries.Count += 1;
});
if (tries != null && tries.Count >= NumberOfTries)
{
HandleFailedMessage(messageInfo, exception);
}
}
private void
OnMessageProcessingCompleted(CurrentMessageInformation
messageInformation, Exception exception)
{
if (exception != null) return;
TryCount tries = null;
_tryCounts.Read(reader =>
reader.TryGetValue(messageInformation.TransportMessageId, out tries));
if (tries == null) return;
_tryCounts.Write(writer =>
writer.Remove(messageInformation.TransportMessageId));
}
private void
OnMessageSerializationException(CurrentMessageInformation messageInfo,
Exception exception)
{
HandleFailedMessage(messageInfo, exception);
}
private void HandleFailedMessage(CurrentMessageInformation
messageInfo, Exception exception)
{
if (messageInfo.Message is DoTerminateSaga) return;
var sagaMessage = messageInfo.Message as SagaMessageBase;
if (sagaMessage == null) return;
_messenger.DoCommand(
new DoTerminateSaga
{
CorrelationId = sagaMessage.CorrelationId,
Reason = exception == null ? "Unknown" :
exception.Message
});
}
private class TryCount
{
public int Count;