Transaction handling

557 views
Skip to first unread message

Martin Nilsson

unread,
Jun 4, 2012, 5:11:02 AM6/4/12
to masstransit-discuss
I'm trying to wrap my consumers in an transaction. This is the link for recommendation to handle UoW
http://stackoverflow.com/questions/5569320/unit-of-work-when-using-masstransit/5572031#5572031

LocalBus = ServiceBusConfigurator.New(x =>
{
x.ReceiveFrom("loopback://localhost/mt_client");

x.BeforeConsumingMessage(() => { _before.Set(); });
x.AfterConsumingMessage(() => { _after.Set(); });
});

I'm using RabbitMQ, so no DTC.

This is my current setup

sbc.BeforeConsumingMessage(() => 
scope = new TransactionScope(TransactionScopeOption.RequiresNew); 
});
sbc.AfterConsumingMessage(() =>
{
scope.Complete();
scope.Dispose();
});

Because of AfterConsumingMessage is executing even when there is an exception the transaction will *always* be commited. 

We are replacing NServicebus with MT and in NSB I solved it by implementing IMessageDispatcherFactory.

Something like this:
public IEnumerable<Action> GetDispatcher(Type messageHandlerType, IBuilder builder, object toHandle)
{
yield return () =>
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
foreach (var action in innerFactory.GetDispatcher(messageHandlerType, builder, toHandle))
{
action.Invoke();
}
scope.Complete();
}
};
}

Could anyone please guide me on how to handle this in MT?

//martin




Chris Patterson

unread,
Jun 4, 2012, 12:24:53 PM6/4/12
to masstrans...@googlegroups.com
You need to write your own consumer factory that will manage the lifecycle of your consumer within the transaction.

And you're right, the RabbitMQ transport does not communicate with the DCT. If you want the message to say on the queue, you must throw the exception out of the consumer so that the transport can Nack it.


//martin




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

Mick Delaney

unread,
Jun 4, 2012, 4:01:21 PM6/4/12
to masstrans...@googlegroups.com
something like this:

 public class WindsorConsumerFactory<T> : IConsumerFactory<T> where T : class
    {
        readonly ILog _logger = LogManager.GetLogger(typeof (WindsorConsumerFactory<T>));
        readonly IWindsorContainer _container;

        public WindsorConsumerFactory(IWindsorContainer container)
        {
            _container = container;
        }

        public IEnumerable<Action<IConsumeContext<TMessage>>> GetConsumer<TMessage>(IConsumeContext<TMessage> context, 
            InstanceHandlerSelector<T, TMessage> selector) where TMessage : class
        {
            using (_container.BeginScope())
            {
                var sessionFactory = BindLazySession();
                var eventDispatcher = _container.Resolve<IDispatchEvents>();

                T consumer = null;

                try
                {
                    consumer = _container.Resolve<T>();
                    if (consumer == null)
                    {
                        throw new ConfigurationException(string.Format("Unable to resolve type '{0}' from container: ", typeof(T)));
                    }

                    //TODO: Security
                    //Attach Security Context From Message Into Container As A Scoped Resource
                    //AttachSecurityContext<TMessage>();

                    foreach (var handler in selector(consumer, context))
                    {
                        yield return handler;
                    }

                    try
                    {

                        CommitLazySessionIfActive(sessionFactory);
                    }
                    catch(Exception e)
                    {
                        _logger.Error(string.Format("Exception thrown saving to database. Message Type: {0}. ", typeof(T).Name), e);
                        throw;
                    }

                    eventDispatcher.Dispatch();
                }
                finally
                {
                    _container.Release(consumer);
                }

                DisposeSessionIfActive(sessionFactory);
            }
        }
        
        ISessionFactory BindLazySession()
        {
            var sessionFactory = _container.Resolve<ISessionFactory>();
            LazySessionContext.Bind(new Lazy<ISession>(() => BeginSession(sessionFactory)), sessionFactory);
            return sessionFactory;
        }
        static ISession BeginSession(ISessionFactory sessionFactory)
        {
            var session = sessionFactory.OpenSession();
            session.BeginTransaction();
            return session;
        }

        static void CommitLazySessionIfActive(ISessionFactory sessionFactory)
        {
            var session = LazySessionContext.UnBind(sessionFactory);
            if (session != null)
            {
                CommitSession(session);
            }
        }
        static void CommitSession(ISession session)
        {
            if (session.Transaction != null && session.Transaction.IsActive)
            {
                session.Transaction.Commit();
            }

            session.Dispose();
        }

        static void DisposeSessionIfActive(ISessionFactory sessionFactory)
        {
            var session = LazySessionContext.UnBind(sessionFactory);
            if (session == null)
            {
                return;
            }

            session.Dispose();
        }

        /// <summary>
        /// Handle:
        /// </summary>
        /// <param name="message"></param>
        void AttachSecurityContext(object message)
        {
            //TODO
        }
    }

Chris Patterson

unread,
Jun 4, 2012, 4:26:29 PM6/4/12
to masstrans...@googlegroups.com
That's the general idea, can't be certain without testing.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.

Mick Delaney

unread,
Jun 4, 2012, 5:53:08 PM6/4/12
to masstrans...@googlegroups.com
i'm testing it at the moment. 

i have noticed that i seem to get 2 invocations of the a given handler for 1 message... 

is there anything obvious you can see that would cause that:

 public class WindsorConsumerFactoryConfigurator  {

        readonly SubscriptionBusServiceConfigurator _configurator;

        readonly IWindsorContainer _container;

        public WindsorConsumerFactoryConfigurator(SubscriptionBusServiceConfigurator configurator, IWindsorContainer container)  {

            _container = container;

            _configurator = configurator;

        }

        public void ConfigureConsumer(Type messageType)   {

            this.FastInvoke(new[] { messageType }, "Configure");

        }

        public void Configure<T>() where T : class, IConsumer {

            _configurator.Consumer(new WindsorConsumerFactory<T>(_container));

        }

    }


        public static void CreateMasstransitConsumersFromContainer(this SubscriptionBusServiceConfigurator configurator, IWindsorContainer container)

        {

            if (configurator == null)

            {

                throw new ArgumentNullException("configurator");

            }

            if (container == null)

            {

                throw new ArgumentNullException("container");

            }


            var consumerTypes = container.FindTypes<IConsumer>(x => x.Implements<ISaga>() == false);

            if (consumerTypes.Count > 0)

            {

                var consumerConfigurator = new ElevateWindsorConsumerFactoryConfigurator(configurator, container);

                foreach (var type in consumerTypes)

                {

                    consumerConfigurator.ConfigureConsumer(type);

                }

            }


            var sagaTypes = container.FindTypes<ISaga>(x => true);

            if (sagaTypes.Count > 0)

            {

                var sagaConfigurator = new WindsorSagaFactoryConfigurator(configurator, container);

                

                foreach (Type type in sagaTypes)

                {

                    sagaConfigurator.ConfigureSaga(type);


To post to this group, send email to masstransit-discuss@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

Chris Patterson

unread,
Jun 4, 2012, 9:24:07 PM6/4/12
to masstrans...@googlegroups.com
Is it handling the distinct types, seems like a consumer type might be returned multiple times or something.

To view this discussion on the web visit https://groups.google.com/d/msg/masstransit-discuss/-/Sh31q3Tm168J.

To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.

Henrik Feldt

unread,
Jun 7, 2012, 11:32:53 AM6/7/12
to masstrans...@googlegroups.com

You are in a serialized scope there. Intentional?

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.

Martin Nilsson

unread,
Jun 8, 2012, 2:07:12 PM6/8/12
to masstrans...@googlegroups.com
Not really sure what you mean by "you are in a serialized scope there"

Martin Nilsson

unread,
Jun 8, 2012, 2:20:24 PM6/8/12
to masstrans...@googlegroups.com
My intention is to wrap all handlers in a single transaction. This is the approach I used to not take part in a distributed transaction.
Then I will have support for idempotent messages if the outer transaction fails.

Martin Nilsson

unread,
Jun 10, 2012, 9:51:25 AM6/10/12
to masstrans...@googlegroups.com
It doesn't feel right to have it in a class that's responsible for the lifetime of a consumer. Also, I want the transaction to span over all consumers.
Something like this:

Receive:
begin
  consumers = get consumers for message
  start transaction
    for each consumer consume message
  end transaction
end

If an exception is thrown in one of the consumers then the transaction will not be committed

My current approach is to add functionality to MT to be able to replace ServiceBusReceiveContext.DeliverMessagesToConsumers with a custom implementation (IDispatcher.Dispatch(IReceiveContext context).

With this functionality it will also, hopefully, be possible to implement ordering of messages by implementing a dispatcher that saves messages to an ordered persistent queue.

Any comments from the experts? This is just an idea from having a look at the MT source.

Chris Patterson

unread,
Jun 10, 2012, 5:43:14 PM6/10/12
to masstrans...@googlegroups.com
You're looking way to deep into MT to implement what you want. You need to be looking at the OutboundPipeline to apply that type of behavior. And things like ordering and such at the level you're suggesting is going to ultimately fail due to lack of memory, threads, and all sorts of nasty things.

Ordering is an application level concern, not a messaging transport level concern.

Transactions are at the transport level (which is why transaction scopes are created before the message is read from the queue). If you need to apply a transaction at the receiver level of the transport for all consumers on something like RabbitMQ that doesn't support transactions, you could decorate the endpoint with a transactional endpoint that would simulate the transaction. But that will likely fail at some point as well since RabbitMQ is not transactional. 

You could write a resource manager on top of RabbitMQ that would include it in the transaction scope, not sure if that's been done before or not.

Just some random thoughts while I'm low on energy, so any of these may be complete BS.

Martin Nilsson

unread,
Jun 11, 2012, 5:32:01 AM6/11/12
to masstrans...@googlegroups.com
Thanks for your answers.

inline 

On Sun, Jun 10, 2012 at 11:43 PM, Chris Patterson <ch...@phatboyg.com> wrote:
You're looking way to deep into MT to implement what you want.

Martin: maybe I do but this extension point would open up some great possibilities, at least what I think.
 
You need to be looking at the OutboundPipeline to apply that type of behavior. And things like ordering and such at the level you're suggesting is going to ultimately fail due to lack of memory, threads, and all sorts of nasty things.

Martin: OutboundPipeline? Isn't that one for publishing messages? I want to receive the incoming message from the queue and then handle the dispatching to the handlers. I will still call the internal dispatcher but I have the possibility to do other stuff before the call to "consume". Like wrapping it in a new transaction, decide if the message should be ignored but let it through so that ACK will remove it from the queue.
 

Ordering is an application level concern, not a messaging transport level concern.

Martin: I agree but without this extension I need to log all messages and then write the dispatcher to the handlers and impl subscribing and..
 

Transactions are at the transport level (which is why transaction scopes are created before the message is read from the queue). If you need to apply a transaction at the receiver level of the transport for all consumers on something like RabbitMQ that doesn't support transactions, you could decorate the endpoint with a transactional endpoint that would simulate the transaction. But that will likely fail at some point as well since RabbitMQ is not transactional. 

Martin: I don't need RMQ to be transactional, only the delivery to the handlers. If they fail then there will be no ACK to RMQ. If they succeed but ACK to RMQ fails then I will get the same message again but that will be ignored in the custom dispatcher because it's already marked as handled.

Chris Patterson

unread,
Jun 11, 2012, 9:48:09 AM6/11/12
to masstrans...@googlegroups.com
Sorry, meant InboundPipeline. Look at how the Before and After consumer methods are implemented. You could build a pipeline sink in the same way to handle your transactional behavior and insert it into the InboundPipeline to handle your transactional boundary.

Chris Patterson

unread,
Jun 11, 2012, 9:51:00 AM6/11/12
to masstrans...@googlegroups.com

Martin Nilsson

unread,
Jun 11, 2012, 10:00:25 AM6/11/12
to masstrans...@googlegroups.com
Yes, this approach I tried before but I couldn't figure out if there had been an exception in the pipeline. So I didn't know if I should commit or not. Maybe there is a way to see that?

Chris Patterson

unread,
Jun 11, 2012, 10:07:56 AM6/11/12
to masstrans...@googlegroups.com
Well, I would have to try to implement it myself to see if it could be done, as I haven't tried it's hard to speculate.

Martin Nilsson

unread,
Jun 11, 2012, 10:16:44 AM6/11/12
to masstrans...@googlegroups.com
No worries, I might have a solution. You don't have to look at it right now Chris.

Erik Källén

unread,
Mar 27, 2015, 5:09:47 AM3/27/15
to masstrans...@googlegroups.com, mar...@telicminds.se
I'm having the same problem now.

Do you remember how you solved it?

//martin




To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages