defer a message, and retry later

1,079 views
Skip to first unread message

shahid

unread,
Apr 6, 2010, 2:39:20 PM4/6/10
to masstransit-discuss
I am using a basic message handler to consume messages, and needing a
mechanism to defer some of the messages (perhaps put them back on the
queue) and have them sent back to the message consumer after some
time. I have seen CurrentMessage.RetryLater(), but i was not sure how
to use it for my needs - does this put the message back in queue? and
how does one specify a time span to associate with the retry?

also, is there some way to specify the number of retry attempts?

Travis Smith

unread,
Apr 6, 2010, 2:52:07 PM4/6/10
to masstrans...@googlegroups.com
CurrentMessage.RetryLater() will just stick the message on the end of
the queue. There is no timespan associated with Retry, just when the
Consumer gets to that point in the queue again the message will be
processed. When the only messages left are retries, you'll chew up a
lot of resources trying the message again and again.

I think building out a Saga would be a better solution but I think Dru
or Chris might provide better insight to what that might look like. My
sagas are all pretty simple. http://masstransit.pbworks.com/Sagas has
some details but no real good examples (and I think it's out of date).
There are some examples in the solution if you take a peek though.

-Travis

> --
> You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
> To post to this group, send email to masstrans...@googlegroups.com.
> To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.
>
>

Shahid Mohammed

unread,
Apr 6, 2010, 5:43:56 PM4/6/10
to masstrans...@googlegroups.com
Thanks Travis.

I have tried CurrentMessage.RetryLater(), and it does cause a lot of
CPU usage when the retry messages keep coming back in.

I might be able to handle this using FutureMessage with
WaitUntilAvailable set to the timespan that i want the message to be
sent back after. when i use FutureMessage though, is the message put
back into the queue? in other words, is creating a FutureMessage the
equivalent of actually consuming the message?

i guess, i am concerned about the scenario where a message comes in, i
set it to wait for 5 mins, and during this period, the service crashes
- in this instance, will this message be put back in the queue, or is
this lost?

Travis Smith

unread,
Apr 6, 2010, 9:43:47 PM4/6/10
to masstrans...@googlegroups.com
I don't know if FutureMessage is exactly what you're looking for...
FutureMessage allows you to block the thread until a requested message
type arrives. And using FutureMessage would not place the message you
are currently processing back on the queue. What you would want to do
in this case is ensure you are using transactional queues so if there
is a failure the transaction will not be completed and you'll still
have the message.

Maybe if describe the bigger picture we can consider other ways to
handle this as well. Why do you want to delay execution? If you are
waiting for a response message there is also the MakeRequest extension
(example: http://github.com/phatboyg/MassTransit/blob/master/src/Samples/WebRequestReply/WebRequestReply.Core/RequestReplyController.cs).
If you are waiting for another activity in a chain of needed
activities then there's Sagas, which handle these state transitions
with a lot more finesse.

-Travis

Shahid Mohammed

unread,
Apr 6, 2010, 10:23:22 PM4/6/10
to masstrans...@googlegroups.com
In the scenario that i have, i will have NewOrder messages containing an OrderId, and i might get some OrderUpdate messages containing the OrderId. due to the nature of the app, its possible that i receive the OrderUpdate messages before the NewOrder message arrives - so when i try to process the OrderUpdate message, i will not find correponding Order/OrderId in the Order repository.

from what you have been summarizing so far, it appears like i may have to use a Saga for this. i will take a look at the samples in the trunk.

Thanks!

Travis Smith

unread,
Apr 7, 2010, 7:31:22 AM4/7/10
to masstrans...@googlegroups.com
We have a case which might be similar.

We get a message in indicating that some activities must occur on a
given set of data. Some of the activities require that data is
entered, such as your NewOrder, before the additional activities
happen. The activity that does the coordinating sends the activity
that is required for others and waits for a response before sending
any additional messages.

static WorkflowSaga()
{
Define(() =>
{
RemoveWhen(saga => saga.CurrentState == Completed);

Initially(
When(NewRegistration).
Then((saga, message) =>
saga.ProcessNewRegistration(message)).
TransitionTo(Processing));

During(Processing,
When(ActivityComplete).
Then((saga, message) =>
saga.ProcessActivityComplete(message)),
When(TimeoutHasExpired).
Then((saga, message) =>
saga.ProcessTimeout(message)));
});
}

This controls the order the messages arrive in; however, it is
possible that you don't have any control over the order the messages
arrive in, another option might be to implement
Consumes<OrderUpdate>.Selected instead of Consumes<OrderUpdate>.All.
In the Accept(OrderUpdate message), you can check if the order has
been inserted, if not, reject the message and it will stay in the
queue. This will slightly reduce the CPU load since the queue items
are only iterated when a new message arrives or a wait timeout occurs
(I think on the order of 200ms right now). As an example, the
Distributor load balancing implements the .Selected interface.
Consumers are only allowed to actively work on limited number of items
at once and this will reject messages once that limit is reached
(http://github.com/phatboyg/MassTransit/blob/master/src/MassTransit/Distributor/Worker.cs).

Let me know if you have additional questions.

-Travis

Shahid Mohammed

unread,
Apr 7, 2010, 12:46:12 PM4/7/10
to masstrans...@googlegroups.com
thanks a lot for the detailed reply. I have tried out the Consumes<OrderUpdate>.Selected - and it works fine. :)

i will definitely take a look at the Saga option too.

wrote:
> In the scenario that i have, i will have NewOrder messages containing an
> OrderId, and i might get some OrderUpdate messages containing the OrderId.
> due to the nature of the app, its possible that i receive the OrderUpdate
> messages before the NewOrder message arrives - so when i try to process the
> OrderUpdate message, i will not find correponding Order/OrderId in the Order
> repository.
>
> from what you have been summarizing so far, it appears like i may have to
> use a Saga for this. i will take a look at the samples in the trunk.
>
> Thanks!
>
Reply all
Reply to author
Forward
0 new messages