Publishing messages from consumers

612 views
Skip to first unread message

Pierre Doucy

unread,
Feb 28, 2016, 11:36:06 AM2/28/16
to masstransit-discuss
Hello everyone, and thank you for the great work on MassTransit!

As part of a process orchestrated by a saga I have consumers that looks something like this:

public MyConsumer : IConsumer<ISomeMessage>
{
private readonly IBus _bus;

public MyConsumer(IBus bus)
{
_bus = bus;
}
 
public async Task Consume(ConsumeContext<ISomeMessage> context)
{
NotifyProcessStarted();

var result = await DoSomeLongRunningStuff();

NotifyProcessFinished(result);
}
}

Both NotifyProcessStarted and NotifyProcessFinished use the injected IBus to allow the orchestrator to transition states (Queued -> Running -> Done).
Now it looks like the messages are actually delivered only after the consume method returns. Is that expected behaviour? Is there a better recommended way to send messages while in a consumer?

Some more information that may or may not be helpful: I am using RabbitMq and PrefetchCount is set to 1 on the consumer's side.

Thank you!
Pierre

Matt Honeycutt

unread,
Feb 28, 2016, 12:55:00 PM2/28/16
to masstransit-discuss
The way I understand it, you wouldn't want to do any long-running processing in your consumer.  You would want your consumers to be fairly short-lived.  

That said, I think your messages should still be published out and routed to the appropriate consumers.  If they're consumers that are also sharing the same queue, you might want to check to see how many concurrent messages you've got your receive end-point configured to process.  I want to say 4 is the default when using RabbitMQ, but I could be wrong.  

Chris Patterson

unread,
Feb 28, 2016, 2:20:07 PM2/28/16
to masstrans...@googlegroups.com
You should not be using IBus in a consumer -- if you need to publish events, ConsumeContext implements IPublishEndpoint, which is where publishing happens. If you need to send, it also includes ISendEndpointProvider, which is how you get a send endpoint. In a consumer, all of the messages sent through this approach include the proper correlation identifiers (including the ConversationId and the InitiatorId) so you can trace the messages through the system. They also ensure the proper return address is included in the message.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/b33a865a-a2d8-4d62-88ee-bfea60160ced%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Pierre Doucy

unread,
Feb 28, 2016, 4:22:41 PM2/28/16
to masstransit-discuss
Right. I remember finding it weird when I did not see a Publish method in the context. Not sure why I missed it.

Thank you very much!


On Sunday, February 28, 2016 at 2:20:07 PM UTC-5, Chris Patterson wrote:
You should not be using IBus in a consumer -- if you need to publish events, ConsumeContext implements IPublishEndpoint, which is where publishing happens. If you need to send, it also includes ISendEndpointProvider, which is how you get a send endpoint. In a consumer, all of the messages sent through this approach include the proper correlation identifiers (including the ConversationId and the InitiatorId) so you can trace the messages through the system. They also ensure the proper return address is included in the message.

On Sun, Feb 28, 2016 at 8:36 AM, Pierre Doucy <pierre...@gmail.com> wrote:
Hello everyone, and thank you for the great work on MassTransit!

As part of a process orchestrated by a saga I have consumers that looks something like this:

public MyConsumer : IConsumer<ISomeMessage>
{
private readonly IBus _bus;

public MyConsumer(IBus bus)
{
_bus = bus;
}
 
public async Task Consume(ConsumeContext<ISomeMessage> context)
{
NotifyProcessStarted();

var result = await DoSomeLongRunningStuff();

NotifyProcessFinished(result);
}
}

Both NotifyProcessStarted and NotifyProcessFinished use the injected IBus to allow the orchestrator to transition states (Queued -> Running -> Done).
Now it looks like the messages are actually delivered only after the consume method returns. Is that expected behaviour? Is there a better recommended way to send messages while in a consumer?

Some more information that may or may not be helpful: I am using RabbitMq and PrefetchCount is set to 1 on the consumer's side.

Thank you!
Pierre

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages