Transaction handling

瀏覽次數:557 次
跳到第一則未讀訊息

Martin Nilsson

未讀,
2012年6月4日 清晨5:11:022012/6/4
收件者:masstransit-discuss
I'm trying to wrap my consumers in an transaction. This is the link for recommendation to handle UoW
http://stackoverflow.com/questions/5569320/unit-of-work-when-using-masstransit/5572031#5572031

LocalBus = ServiceBusConfigurator.New(x =>
{
x.ReceiveFrom("loopback://localhost/mt_client");

x.BeforeConsumingMessage(() => { _before.Set(); });
x.AfterConsumingMessage(() => { _after.Set(); });
});

I'm using RabbitMQ, so no DTC.

This is my current setup

sbc.BeforeConsumingMessage(() => 
scope = new TransactionScope(TransactionScopeOption.RequiresNew); 
});
sbc.AfterConsumingMessage(() =>
{
scope.Complete();
scope.Dispose();
});

Because of AfterConsumingMessage is executing even when there is an exception the transaction will *always* be commited. 

We are replacing NServicebus with MT and in NSB I solved it by implementing IMessageDispatcherFactory.

Something like this:
public IEnumerable<Action> GetDispatcher(Type messageHandlerType, IBuilder builder, object toHandle)
{
yield return () =>
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
{
foreach (var action in innerFactory.GetDispatcher(messageHandlerType, builder, toHandle))
{
action.Invoke();
}
scope.Complete();
}
};
}

Could anyone please guide me on how to handle this in MT?

//martin




Chris Patterson

未讀,
2012年6月4日 中午12:24:532012/6/4
收件者:masstrans...@googlegroups.com
You need to write your own consumer factory that will manage the lifecycle of your consumer within the transaction.

And you're right, the RabbitMQ transport does not communicate with the DCT. If you want the message to say on the queue, you must throw the exception out of the consumer so that the transport can Nack it.


//martin




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

Mick Delaney

未讀,
2012年6月4日 下午4:01:212012/6/4
收件者:masstrans...@googlegroups.com
something like this:

 public class WindsorConsumerFactory<T> : IConsumerFactory<T> where T : class
    {
        readonly ILog _logger = LogManager.GetLogger(typeof (WindsorConsumerFactory<T>));
        readonly IWindsorContainer _container;

        public WindsorConsumerFactory(IWindsorContainer container)
        {
            _container = container;
        }

        public IEnumerable<Action<IConsumeContext<TMessage>>> GetConsumer<TMessage>(IConsumeContext<TMessage> context, 
            InstanceHandlerSelector<T, TMessage> selector) where TMessage : class
        {
            using (_container.BeginScope())
            {
                var sessionFactory = BindLazySession();
                var eventDispatcher = _container.Resolve<IDispatchEvents>();

                T consumer = null;

                try
                {
                    consumer = _container.Resolve<T>();
                    if (consumer == null)
                    {
                        throw new ConfigurationException(string.Format("Unable to resolve type '{0}' from container: ", typeof(T)));
                    }

                    //TODO: Security
                    //Attach Security Context From Message Into Container As A Scoped Resource
                    //AttachSecurityContext<TMessage>();

                    foreach (var handler in selector(consumer, context))
                    {
                        yield return handler;
                    }

                    try
                    {

                        CommitLazySessionIfActive(sessionFactory);
                    }
                    catch(Exception e)
                    {
                        _logger.Error(string.Format("Exception thrown saving to database. Message Type: {0}. ", typeof(T).Name), e);
                        throw;
                    }

                    eventDispatcher.Dispatch();
                }
                finally
                {
                    _container.Release(consumer);
                }

                DisposeSessionIfActive(sessionFactory);
            }
        }
        
        ISessionFactory BindLazySession()
        {
            var sessionFactory = _container.Resolve<ISessionFactory>();
            LazySessionContext.Bind(new Lazy<ISession>(() => BeginSession(sessionFactory)), sessionFactory);
            return sessionFactory;
        }
        static ISession BeginSession(ISessionFactory sessionFactory)
        {
            var session = sessionFactory.OpenSession();
            session.BeginTransaction();
            return session;
        }

        static void CommitLazySessionIfActive(ISessionFactory sessionFactory)
        {
            var session = LazySessionContext.UnBind(sessionFactory);
            if (session != null)
            {
                CommitSession(session);
            }
        }
        static void CommitSession(ISession session)
        {
            if (session.Transaction != null && session.Transaction.IsActive)
            {
                session.Transaction.Commit();
            }

            session.Dispose();
        }

        static void DisposeSessionIfActive(ISessionFactory sessionFactory)
        {
            var session = LazySessionContext.UnBind(sessionFactory);
            if (session == null)
            {
                return;
            }

            session.Dispose();
        }

        /// <summary>
        /// Handle:
        /// </summary>
        /// <param name="message"></param>
        void AttachSecurityContext(object message)
        {
            //TODO
        }
    }

Chris Patterson

未讀,
2012年6月4日 下午4:26:292012/6/4
收件者:masstrans...@googlegroups.com
That's the general idea, can't be certain without testing.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.

Mick Delaney

未讀,
2012年6月4日 下午5:53:082012/6/4
收件者:masstrans...@googlegroups.com
i'm testing it at the moment. 

i have noticed that i seem to get 2 invocations of the a given handler for 1 message... 

is there anything obvious you can see that would cause that:

 public class WindsorConsumerFactoryConfigurator  {

        readonly SubscriptionBusServiceConfigurator _configurator;

        readonly IWindsorContainer _container;

        public WindsorConsumerFactoryConfigurator(SubscriptionBusServiceConfigurator configurator, IWindsorContainer container)  {

            _container = container;

            _configurator = configurator;

        }

        public void ConfigureConsumer(Type messageType)   {

            this.FastInvoke(new[] { messageType }, "Configure");

        }

        public void Configure<T>() where T : class, IConsumer {

            _configurator.Consumer(new WindsorConsumerFactory<T>(_container));

        }

    }


        public static void CreateMasstransitConsumersFromContainer(this SubscriptionBusServiceConfigurator configurator, IWindsorContainer container)

        {

            if (configurator == null)

            {

                throw new ArgumentNullException("configurator");

            }

            if (container == null)

            {

                throw new ArgumentNullException("container");

            }


            var consumerTypes = container.FindTypes<IConsumer>(x => x.Implements<ISaga>() == false);

            if (consumerTypes.Count > 0)

            {

                var consumerConfigurator = new ElevateWindsorConsumerFactoryConfigurator(configurator, container);

                foreach (var type in consumerTypes)

                {

                    consumerConfigurator.ConfigureConsumer(type);

                }

            }


            var sagaTypes = container.FindTypes<ISaga>(x => true);

            if (sagaTypes.Count > 0)

            {

                var sagaConfigurator = new WindsorSagaFactoryConfigurator(configurator, container);

                

                foreach (Type type in sagaTypes)

                {

                    sagaConfigurator.ConfigureSaga(type);


To post to this group, send email to masstransit-discuss@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

Chris Patterson

未讀,
2012年6月4日 晚上9:24:072012/6/4
收件者:masstrans...@googlegroups.com
Is it handling the distinct types, seems like a consumer type might be returned multiple times or something.

To view this discussion on the web visit https://groups.google.com/d/msg/masstransit-discuss/-/Sh31q3Tm168J.

To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.

Henrik Feldt

未讀,
2012年6月7日 上午11:32:532012/6/7
收件者:masstrans...@googlegroups.com

You are in a serialized scope there. Intentional?

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.

Martin Nilsson

未讀,
2012年6月8日 下午2:07:122012/6/8
收件者:masstrans...@googlegroups.com
Not really sure what you mean by "you are in a serialized scope there"

Martin Nilsson

未讀,
2012年6月8日 下午2:20:242012/6/8
收件者:masstrans...@googlegroups.com
My intention is to wrap all handlers in a single transaction. This is the approach I used to not take part in a distributed transaction.
Then I will have support for idempotent messages if the outer transaction fails.

Martin Nilsson

未讀,
2012年6月10日 上午9:51:252012/6/10
收件者:masstrans...@googlegroups.com
It doesn't feel right to have it in a class that's responsible for the lifetime of a consumer. Also, I want the transaction to span over all consumers.
Something like this:

Receive:
begin
  consumers = get consumers for message
  start transaction
    for each consumer consume message
  end transaction
end

If an exception is thrown in one of the consumers then the transaction will not be committed

My current approach is to add functionality to MT to be able to replace ServiceBusReceiveContext.DeliverMessagesToConsumers with a custom implementation (IDispatcher.Dispatch(IReceiveContext context).

With this functionality it will also, hopefully, be possible to implement ordering of messages by implementing a dispatcher that saves messages to an ordered persistent queue.

Any comments from the experts? This is just an idea from having a look at the MT source.

Chris Patterson

未讀,
2012年6月10日 下午5:43:142012/6/10
收件者:masstrans...@googlegroups.com
You're looking way to deep into MT to implement what you want. You need to be looking at the OutboundPipeline to apply that type of behavior. And things like ordering and such at the level you're suggesting is going to ultimately fail due to lack of memory, threads, and all sorts of nasty things.

Ordering is an application level concern, not a messaging transport level concern.

Transactions are at the transport level (which is why transaction scopes are created before the message is read from the queue). If you need to apply a transaction at the receiver level of the transport for all consumers on something like RabbitMQ that doesn't support transactions, you could decorate the endpoint with a transactional endpoint that would simulate the transaction. But that will likely fail at some point as well since RabbitMQ is not transactional. 

You could write a resource manager on top of RabbitMQ that would include it in the transaction scope, not sure if that's been done before or not.

Just some random thoughts while I'm low on energy, so any of these may be complete BS.

Martin Nilsson

未讀,
2012年6月11日 清晨5:32:012012/6/11
收件者:masstrans...@googlegroups.com
Thanks for your answers.

inline 

On Sun, Jun 10, 2012 at 11:43 PM, Chris Patterson <ch...@phatboyg.com> wrote:
You're looking way to deep into MT to implement what you want.

Martin: maybe I do but this extension point would open up some great possibilities, at least what I think.
 
You need to be looking at the OutboundPipeline to apply that type of behavior. And things like ordering and such at the level you're suggesting is going to ultimately fail due to lack of memory, threads, and all sorts of nasty things.

Martin: OutboundPipeline? Isn't that one for publishing messages? I want to receive the incoming message from the queue and then handle the dispatching to the handlers. I will still call the internal dispatcher but I have the possibility to do other stuff before the call to "consume". Like wrapping it in a new transaction, decide if the message should be ignored but let it through so that ACK will remove it from the queue.
 

Ordering is an application level concern, not a messaging transport level concern.

Martin: I agree but without this extension I need to log all messages and then write the dispatcher to the handlers and impl subscribing and..
 

Transactions are at the transport level (which is why transaction scopes are created before the message is read from the queue). If you need to apply a transaction at the receiver level of the transport for all consumers on something like RabbitMQ that doesn't support transactions, you could decorate the endpoint with a transactional endpoint that would simulate the transaction. But that will likely fail at some point as well since RabbitMQ is not transactional. 

Martin: I don't need RMQ to be transactional, only the delivery to the handlers. If they fail then there will be no ACK to RMQ. If they succeed but ACK to RMQ fails then I will get the same message again but that will be ignored in the custom dispatcher because it's already marked as handled.

Chris Patterson

未讀,
2012年6月11日 上午9:48:092012/6/11
收件者:masstrans...@googlegroups.com
Sorry, meant InboundPipeline. Look at how the Before and After consumer methods are implemented. You could build a pipeline sink in the same way to handle your transactional behavior and insert it into the InboundPipeline to handle your transactional boundary.

Chris Patterson

未讀,
2012年6月11日 上午9:51:002012/6/11
收件者:masstrans...@googlegroups.com

Martin Nilsson

未讀,
2012年6月11日 上午10:00:252012/6/11
收件者:masstrans...@googlegroups.com
Yes, this approach I tried before but I couldn't figure out if there had been an exception in the pipeline. So I didn't know if I should commit or not. Maybe there is a way to see that?

Chris Patterson

未讀,
2012年6月11日 上午10:07:562012/6/11
收件者:masstrans...@googlegroups.com
Well, I would have to try to implement it myself to see if it could be done, as I haven't tried it's hard to speculate.

Martin Nilsson

未讀,
2012年6月11日 上午10:16:442012/6/11
收件者:masstrans...@googlegroups.com
No worries, I might have a solution. You don't have to look at it right now Chris.

Erik Källén

未讀,
2015年3月27日 清晨5:09:472015/3/27
收件者:masstrans...@googlegroups.com、mar...@telicminds.se
I'm having the same problem now.

Do you remember how you solved it?

//martin




To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.
回覆所有人
回覆作者
轉寄
0 則新訊息