Re: [masstransit-discuss] Some messages are being SKIP'd and end up in error queue

2,630 views
Skip to first unread message

Chris Patterson

unread,
Oct 11, 2012, 10:54:02 AM10/11/12
to masstrans...@googlegroups.com
Let's see your producer and consumer bus factory code so we can try to
identify the issue.

On Thu, Oct 11, 2012 at 9:48 AM, Stif Labarque <stif.l...@gmail.com> wrote:
> Hi
>
> We are seeing strange behaviour.
>
> A message that is picked up by our subscriber most of the time, sometimes
> never makes it to our subscriber.
>
> Here's what we see in the logs:
>
> 2012-10-11
> 16:00:27.8912|DEBUG|MassTransit.Messages|SEND:rabbitmq://server/NameSpace.OurMessage::NameSpace.OurMessage,
> NameSpace
> 2012-10-11
> 16:00:27.8912|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9381|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9381|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9381|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9225|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9225|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9225|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9225|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9225|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9069|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.8912|DEBUG|MassTransit.Messages|SKIP:rabbitmq://server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11 16:00:27.9381|ERROR|MassTransit.Transports.Endpoint|Message retry
> limit exceeded
> rabbitmq:/server/QueueName:08cf75cf-4a92-050f-0050-5683480a0000
> 2012-10-11
> 16:00:27.9694|DEBUG|MassTransit.Messages|SEND:rabbitmq:/server/QueueName_error:08cf75cf-4a92-050f-0050-5683480a0000:
> 2012-10-11
> 16:00:27.9694|INFO|MassTransit.Messages|MOVE:rabbitmq://server/QueueName:rabbitmq://server/QueueName_error:08cf75cf-4a92-050f-0050-5683480a0000:
>
> We get 10 skips but the message is never delivered to our handler.
> We've provided log action on BeforeConsumingMessage
> (sbc.BeforeConsumingMessage(() =>
> logger.WriteVerbose("BeforeConsumingMessage"));) but we do not see this log
> statement when this SKIP behaviour occurs.
> To subscriber our handlers we're using Subscribe<TMessage,
> THandlerType>(Action<TMessage, Action<Exception>> handle, int maxRetries)
>
>
> Does anyone have an idea what might be going on?
>
>
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "masstransit-discuss" group.
> To post to this group, send email to masstrans...@googlegroups.com.
> To unsubscribe from this group, send email to
> masstransit-dis...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msg/masstransit-discuss/-/sLKLOmrjyvkJ.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>

Jochen

unread,
Oct 30, 2012, 5:42:45 AM10/30/12
to masstrans...@googlegroups.com
Hi Chris!

We did a number of unit tests and cannot reproduce the issue. The issue is still present in our staging environment, but only occurs after a few days on a fresh install. At first, everything works as expected, but after a while, messages sometimes do not get delivered to their handlers, they simply end up in the error queue without ever reaching our handler's code.

Looking at the config MassTransit created in RabbitMQ, we see that our target queue is bound to an exchange X as expected.
X receives messages of types E1 and E2 from their respective exchanges, as expected.
X also receives messages of type MassTransit.Diagnostics.Tracing:GetMessageTraceList.

The subscription code is a bit complicated because handler creation (messaging technology independent) is separated from exception handling (messaging technology dependent because of retry-mechanisms). Our MassTransitSubscriber subscribes to MassTransit as follows:

        protected override void Subscribe<TMessage, THandlerType>(Action<TMessage, Action<Exception>> handle, int maxRetries)
        {
            serviceBus.SubscribeHandler<TMessage>(message => handle(message, exc =>
                {
                    IConsumeContext<TMessage> context = serviceBus.MessageContext<TMessage>();

                    var extProps = new Dictionary<string, object>
                        {
                            {"Message ID", context.MessageId},
                            {"Message Type", context.MessageType},
                            {"Exception", exc}
                        };

                    if (context.RetryCount < maxRetries)
                    {
                        logger.WriteWarning("Failed to process message. Message will be retried.", extProps);
                        context.RetryLater();
                    }
                    else
                        logger.WriteError("Failed to process message (dead message).", extProps);
                }));

            logger.WriteVerbose(string.Format("MassTransit subscriber {2} subscribed to {0} on {1}.", typeof (TMessage).Name, serviceBus.Endpoint.Address, instanceId));
            this.WriteLine("Subscribed to {0} on {1}.", typeof (TMessage).Name, serviceBus.Endpoint.Address);
        }
        
The actual delegate which is subscribed is a 'safe' invocation of the underlying message handler type. Safe means that the handler is wrapped with logging, exception handling and disposal if required. This safe method is defined in the MessageBusSubscriber which is a the base class for the MassTransitSubscriber:

        protected void Handle<TMessage, THandlerType>(TMessage message, Action<Exception> handleException)
        {
            logger.WriteVerbose(string.Format("Receiving message of type {0} on {1}.", message.GetType().Name, this));

            IMessageHandler<TMessage> handler = null;

            Type handlerType = typeof (THandlerType);
            string messageTypeName = message.GetType().Name;
            string handlerTypeName = handlerType.Name;

            var ep = new Dictionary<string, object> {{"Message", message}};

            logger.Log(LogCategories.Technical, LogSeverity.Info, ep, "Handling {0} on {1}...", messageTypeName, handlerTypeName);

            try
            {
                handler = (IMessageHandler<TMessage>) createHandler(handlerType);
                handler.Handle(message);
                logger.Log(LogCategories.Technical, LogSeverity.Info, "Handling {0} on {1} completed.", messageTypeName, handlerTypeName);
            }
            catch (Exception exc)
            {
                logger.Log(LogCategories.Technical, LogSeverity.Warning, "Handling {0} on {1} failed.", messageTypeName, handlerTypeName);

                try
                {
                    // handle exception can vary based on the message bus technology (retry mechanisms)
                    handleException(exc);
                }
                catch (Exception e)
                {
                    var extProps = new Dictionary<string, object>
                                       {
                                           {"Exception", exc},
                                           {"Error handling exception", e}
                                       };

                    logger.Log(LogCategories.Technical, LogSeverity.Error, extProps, "Exception while handling message handler exception.");
                }
            }
            finally
            {
                handler.As<IDisposable>(d => d.Dispose());
            }
        }

What we see when debugging MassTransit is that MassTransit fails to retrieve an IConsumeContext in MassTransit.Context.ReceiveContext:

T is our expected E2 message type. The result is that the message is skipped until it ends up in the dead message queue. Any clues what could cause this?

The code has gone through a lot of unit tests and does not fail during those unit tests and the first few days on a fresh install. We will set up a long running IIS based test to see to try to reproduce it.

Curious to find out what makes this happen!

Jochen

Jochen

unread,
Oct 30, 2012, 11:08:21 AM10/30/12
to masstrans...@googlegroups.com
Here's what we find in the MassTransit logging:

First the subscriptions are done for Event1 and Event2:

2012-10-30 10:17:40.1175|INFO|MassTransit.Pipeline.Sinks.MessageRouter<MassTransit.IConsumeContext<Events.Event2>>|tid_8: Creating messagerouter with 0 sinks
2012-10-30 10:17:40.1175|INFO|MassTransit.Pipeline.Sinks.MessageRouter<MassTransit.IConsumeContext<Events.Event2>>|tid_8: Connecting sink MassTransit.Pipeline.Sinks.InstanceMessageSink`1[[Events.Event2, SDS.Events]], MassTransit
2012-10-30 10:17:40.1385|DEBUG|MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener|SubscribeTo: Events.Event2, SDS.Events, 08cf8496-8b3d-fb65-1803-73444c1b0000
2012-10-30 10:17:40.1385|DEBUG|MassTransit.Subscriptions.Coordinator.BusSubscriptionCache|SubscribeTo: Events.Event2, SDS.Events, 08cf8496-8b3d-fb65-1803-73444c1b0000
2012-10-30 10:17:40.1545|DEBUG|MassTransit.Subscriptions.Coordinator.BusSubscription|SubscribeTo: Events.Event2, SDS.Events, 08cf8496-8b3e-93d1-1803-73444c1b0000
2012-10-30 10:17:40.2345|INFO|MassTransit.Pipeline.Sinks.MessageRouter<MassTransit.IConsumeContext<Events.Event1>>|tid_8: Creating messagerouter with 0 sinks
2012-10-30 10:17:40.2345|INFO|MassTransit.Pipeline.Sinks.MessageRouter<MassTransit.IConsumeContext<Events.Event1>>|tid_8: Connecting sink MassTransit.Pipeline.Sinks.InstanceMessageSink`1[[Events.Event1, SDS.Events]], MassTransit
2012-10-30 10:17:40.2345|DEBUG|MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener|SubscribeTo: Events.Event1, SDS.Events, 08cf8496-8b41-9e99-1803-73444c1b0000
2012-10-30 10:17:40.2425|DEBUG|MassTransit.Subscriptions.Coordinator.BusSubscriptionCache|SubscribeTo: Events.Event1, SDS.Events, 08cf8496-8b41-9e99-1803-73444c1b0000
2012-10-30 10:17:40.2425|DEBUG|MassTransit.Subscriptions.Coordinator.BusSubscription|SubscribeTo: Events.Event1, SDS.Events, 08cf8496-8b41-c5fe-1803-73444c1b0000

This is a failure to find a consumer for Event2:

2012-10-30 10:19:40.7576|INFO|MassTransit.Context.ServiceBusReceiveContext|tid_7: Enumerating consumers for
2012-10-30 10:19:40.7596|INFO|MassTransit.Context.ServiceBusReceiveContext|tid_7: InboundPipeline is MassTransit.Pipeline.InboundMessagePipeline, MassTransit
2012-10-30 10:19:40.7596|INFO|MassTransit.Pipeline.Sinks.InboundConvertMessageSink<Events.Event2>|tid_7: Trying to convert context to IConsumeContext of Event2
2012-10-30 10:19:40.7596|INFO|MassTransit.Context.ReceiveContext|tid_7: Trying to convert to Event2 with typeConverter of type JsonMessageTypeConverter
2012-10-30 10:19:40.7736|INFO|MassTransit.Serialization.JsonMessageTypeConverter|tid_7: Trying to get value from _mapped
2012-10-30 10:19:40.7736|INFO|MassTransit.Serialization.JsonMessageTypeConverter|tid_7: No value in _mapped
2012-10-30 10:19:40.7736|INFO|MassTransit.Serialization.JsonMessageTypeConverter|tid_7: Is Event2 a supported type? (supported types: urn:message:Events:Event1, urn:message:SDS.Events:BaseEvent, urn:message:EventSourcing:IEvent)
2012-10-30 10:19:40.7736|INFO|MassTransit.Serialization.JsonMessageTypeConverter|tid_7: NO
2012-10-30 10:19:40.7736|INFO|MassTransit.Serialization.JsonMessageTypeConverter|tid_7: Setting _mapped[Event2] = null and returning false
2012-10-30 10:28:16.4361|INFO|MassTransit.Pipeline.Sinks.InboundConvertMessageSink<Events.Event2>|Conversion failed

Is it possible that the common BaseEvent for Event1 and Event2 makes Event2 end up in the sink for Event1?
Reply all
Reply to author
Forward
0 new messages