Registration of Messages not consumers..

117 views
Skip to first unread message

Wayne Brantley

unread,
Nov 19, 2015, 6:54:04 PM11/19/15
to masstransit-discuss
I am struggling with how to make all this work in regards to registration.
I can create a single class that implements IConsumer<ISomeMessage>, IConsumer<ISomeOtherMessage>, IConsumer<IThirdMessage>, and IConsumer<ISomeSingleMessage>.
I could also create 4 different consumers....does not matter..less ceremony to have on class for the consumers - especially since they are just wrappers around CQRS.

I can then use your registration overload along with a LoadFrom() and everything will happily use DI and register itself.   While this works for some situations, it makes it difficult to split messages across different queues.

We ran across many issues trying to get them registered.  
My consumers always have constructor parameters (DI).  So that ruled out most ways of registering.
ep.Consumer(() => new MyMessageLogger(NEED_CONTAINER_RESOLUTION forconstructor););
ep.Consumer<MyConsumer>();  no parameterless constructor


The only way I figured out how to register them was to create TWO consumers.  One consumer per queue that we wanted:

                sbc.ReceiveEndpoint(host, "MyQueue", ep =>
                {
                    ConsumerConfiguratorCache.Configure(typeof(MyConsumerOne), ep, c.Resolve<ILifetimeScope>(), "message");
                });
                sbc.ReceiveEndpoint(host, "AnotherQueue", ep =>
                {
                    ConsumerConfiguratorCache.Configure(typeof(MyConsumerTwo), ep, c.Resolve<ILifetimeScope>(), "message");
                });

The above may be equivalent to something like this....still seems very 'consumer centric'
ep.Consumer(new AutofacConsumerFactory<MyConsumerTwo>(c.Resolve<ILifetimeScope>(), "message"));


This seems backwards.  Register the concrete class and what it implements will be registered on the endpoint.


Heavily leveraging Autofac DI in our applications, I tend to think of things in terms of registering interfaces and using those interfaces to find a concrete implementation.   So, I am thinking about registrations in terms of messages (perhaps incorrectly?).

As such, I would expect to see:

            //this would find every IConsumer<> and register each one to the concrete Consumer class it represents.
            builder.RegisterConsumers(_assemblies);

            builder.Register(c => Bus.Factory.CreateUsingRabbitMq(sbc =>
            {
                sbc.UseSerilog();
                var host = sbc.Host("rabbitmq://localhost"), h =>
                {
                    h.Username("guest");
                    h.Password("guest");
                });
                sbc.ReceiveEndpoint(host, "MyQueue", ep =>
                {
                    ep.ListenFor<ISomeMessage>();
                    ep.ListenFor<ISomeOtherMessage>();
                    ep.ListenFor<IThirdMessage>();
                });
                sbc.ReceiveEndpoint(host, "AnotherQueue", ep =>
                {
                    ep.ListenFor<ISomeSingleMessage>();
                    ep.PrefetchCount = 1;
                });
            }))

Then when a particular message comes in, MT would query the DI container for who implements IConsumer<ISomeMessage> and use that consumer.
Perhaps the above would take some sort of 'ConsumerFactory' that you called - passing in the messageType and it returns the consumer..

Perhaps AutofacConsumerFactory falls into play here - but it appears to be about a single consumer and supporting a decorator pattern.

Thoughts?



Chris Patterson

unread,
Nov 20, 2015, 1:50:15 AM11/20/15
to masstrans...@googlegroups.com
So the consumer factory is really the key for anything that is a class that consumes messages (which in turn implements the IConsumer<T> interface). Trying to do it any other way is really a bad idea -- the entire consumer structure is build around the consumer factory. The consumer factory handles lifetime scope, consumer resolution (including dependencies), and error management.

All LoadFrom() does is find the concrete types and call:

endpoint.Consumer(() => new AutofacConsumerFactory<TConsumer>());

If anything, an extension method could be added (or might already exist) to just do:

endpoint.Consumer<TConsumer>(ILifetimeScope);

That would just automatically register the consumer factory for the consumer. 

And yes, all of the message types in the consumer class would be bound to the queue for the receive endpoint.

The latter usage you delved into, where the individual IConsumer<T> registrations are added to the container (with the implementation type being a single consumer) is interesting, but leaves the developer to wire up every message type explicitly. While in the example you gave the goal was to split a single consumer across multiple queues, that's a pretty uncommon case. However, the approach is interesting and might be worth supporting more easily. I mean, really, it's just:

endpoint.Consumer<IConsumer<TMessage>>(new AutofacConsumerFactory<TConsumer>());

But in shorthand. Your example isn't passing the container lifetime scope, so that would need to be added, but you could do something along the lines of:

endpoint.ConsumerOf<TMessage>(lifetimeScope);

And as long as the type was in the container (registered as IConsumer<TMessage>) it would work.

Interesting thoughts for sure.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/c5cd5ffd-978f-4474-aa9c-afc32d78c0dd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Wayne Brantley

unread,
Nov 23, 2015, 2:14:18 PM11/23/15
to masstransit-discuss
This extension would help - if TConsumer was not forced to have an empty constructor..
endpoint.Consumer<TConsumer>(ILifetimeScope); 

Question - many of these registrations take ILifetimeScope, which for us is retrieved with c.Resolve<ILifeTime>().  Would it make sense to have a bus.DefaultLifetime(ILifeTime) call and all the lifetimes be used for that?  I find that Resolve<ILifeTime> in lots of places when trying to do additional registrations.

I think in my latter usage when it is by message type is the case where the developer wants to wire up all the message types explicitly.  However, keep in mind you could easily autowire them all up in a very similar way to LoadFrom(), where every closed type of IConsumer is located with Autofac and registered.

>>While in the example you gave the goal was to split a single consumer across multiple queues, that's a pretty uncommon case.
In this case, the reason we need the split is because we need to control the number of simultaneous messages that can be processed at once by message type.   The only way to do that is to set the PrefetchCount, which is set at a queue level.   

Another point to clarify - "splitting a consumer across multiple queues".  For me - a consumer is a simple class with some methods on it.  There is not any state in our 'consumer' class.  It is simply a proxy from a message to the message action.  In addition, those proxy methods simply use our CQRS to call commands/queries - so there is really only 1 or 2 lines of code per method.  From that point of view, there would not ever be a reason to have multiple consumer classes.   As I understand it, a new instance of the consumer class is created for each message received anyway - so unless your consumer is 'lots of code' or unless you want to split consumers by 'message area' or something, little reason to create multiples of them.    Anyway, what I really care about is splitting the messages up across queues.  Which consumer class implements what messages(s) seems unimportant from a MT point of view.  (Unless I am missing something important here!)

>>endpoint.ConsumerOf<TMessage>(lifetimeScope);
That would be perfect - so this would essentially create the AutofacConsumerFactory<> for that one message...that would be perfect!!! 

An we could easily have
endpoint.LoadMessagesFrom(lifetimeScope)......where this located every IConsumer<TMessage> and registered them to the consumer they were on.   (essentially the same thing the current LoadFrom does...but in reverse...instead of finding consumers and registering the messages they want....you find the messages and register the consumers they get)

Thoughts?


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Chris Patterson

unread,
Nov 23, 2015, 4:23:00 PM11/23/15
to masstrans...@googlegroups.com
The reason you want to keep consumers small and focused on as few messages as possible is dependency management. 

Classes are cheap to create, and cheap to destroy. They're also easy to manage in a project. I rarely have much if any behavior in a consumer, as it is akin to a controller in a web project in that it coordinates dependencies and flow, but nothing else.

If you look at how the MassTransit.Host is using endpoint and service configuration and how it's separating by convention the consumers across endpoints, that might give you some idea of the general approach of queues vs consumers.

As a comparison, in our production system we have well over 500 queues and only a couple of queues actually share multiple consumers and they're tightly connected business services that are together purely to reduce message traffic on RabbitMQ.



To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Wayne Brantley

unread,
Nov 24, 2015, 8:59:48 AM11/24/15
to masstransit-discuss
Dependencies - all my consumers have exactly one dependency...the same dependency (a cqrs command/query processor).  The consumers are all pretty much one line of code mapping the message into CQRS layer - so that is not an issue.  

- I was thinking in terms of 'messages coming into a queue' - so that was how I was focused on registration, etc.  So assign messages to an endpoint - figure out consumers from there.
- You have taken a different approach and have designed in terms of 'consumer(s) handing a queue'.  So assign consumers to an endpoint - figure out messages from there.

I don't want to "swim upstream" here and can simply design my consumers more around the way you have done it (obviously very successfully)!   So that is fine.

1)  Maybe you would want to consider supporting the way I was thinking of it - with an additional overload or two on registration?  That is up to you - but if interested, I could attempt a pull request.

2)  Given my consumers all have a dependency is there not an easier way to register them?  I am using this:
               sbc.ReceiveEndpoint(host, "SomeQueue", ep =>
                {
                    ConsumerConfiguratorCache.Configure(typeof(SomeConsumer), ep, c.Resolve<ILifetimeScope>(), "message");
                    ep.PrefetchCount = 1;
                });

I would like to do something like:
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Wayne Brantley

unread,
Nov 24, 2015, 9:05:19 AM11/24/15
to masstransit-discuss
(woops hit post accidentally)
    Something like:
       ep.Consumer<SomeConsumer>(lifetime)   --where SomeConsumer does not require an empty constructor.
   or something a little easier.

(It is very easy if your service has only one queue.  It is when you have multiple queues and want to split consumers across queues that is difficult)


3)  All this confusion and thought was create because I wanted one message to be handled 1 at a time (ep.PrefetchCount = 1) but that is a queue based item.  So I had to split them across queues.   I guess it would be pretty tough to have it limited by message instead of by queue?
Reply all
Reply to author
Forward
0 new messages