Ensuring Message Reception Order

198 views
Skip to first unread message

brya...@gmail.com

unread,
May 22, 2012, 2:15:10 PM5/22/12
to masstransit-discuss
Hello all!

I have a saga that has 3 states; Initial, ReceivingRows, Completed -

public static State Initial { get; set; }
public static State ReceivingRows { get; set; }
public static State Completed { get; set; }

It transitions from Initial to ReceivingRows when it gets a BofMessage
(where Bof = Beginning of file). After the BofMessage, it receives a
large number of RowMessages where each describes a row in a flat file.
Once all RowMessages are sent, an EofMessage is sent and the state
changes to Completed. Observe -

static void DefineSagaBehavior()
{
Initially(When(ReceivedBof)
.Then((saga, message) => saga.BeginFile(message))
.TransitionTo(ReceivingRows));

During(ReceivingRows, When(ReceivedRow)
.Then((saga, message) => saga.AddRow(message)));

During(ReceivingRows, When(ReceivedRowError)
.Then((saga, message) => saga.RowError(message)));

During(ReceivingRows, When(ReceivedEof)
.Then((saga, message) => saga.EndFile(message))
.TransitionTo(Completed));
}

This works, except sometimes several RowMessages are received _before_
the BofMessage! This is regardless of the order that I sent them. This
means that the messages will be received and ultimately counted as
errors, causing them to be missing from the database or file that I
finally write them out to.

As a temporary fix, I add a little sleep timer hack in this method
that does all the publishing –

public static void Publish(
[NotNull] IServiceBus serviceBus,
[NotNull] string publisherName,
Guid correlationId,
[NotNull] Tuple<string, string> inputFileDescriptor,
[NotNull] string outputFileName)
{
// attempt to load offsets
var offsetsResult =
OffsetParser.Parse(inputFileDescriptor.Item1);
if (offsetsResult.Result != ParseOffsetsResult.Success)
{
// publish an offsets invalid message
serviceBus.Publish<TErrorMessage>(CombGuid.Generate(),
publisherName, inputFileDescriptor.Item2);
return;
}

// publish beginning of file
var fullInputFilePath =
Path.GetFullPath(inputFileDescriptor.Item2);
serviceBus.Publish<TBofMessage>(correlationId,
publisherName, fullInputFilePath);

// HACK: make sure bof message happens before row
messages, or else some row messages won't be received
Thread.Sleep(5000);

// publish rows from feed
var feedResult =
FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
foreach (var row in feedResult)
{
// publish row message, unaligned if applicable
if (row.Result != ParseRowResult.Success)

serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName,
row.Fields);
else
serviceBus.Publish<TRowMessage>(correlationId,
publisherName, row.Fields);
}

// publish end of file
serviceBus.Publish<TEofMessage>(correlationId,
publisherName, outputFileName);
}

It’s a 5 second sleep-timer, and is quite an ugly hack. Can anyone
inform me why I’m not getting the messages in the order I send them?
Can I ensure that these message get sent in the right order if they
are unordered by default?

Thank you!

Dru Sellers

unread,
May 22, 2012, 2:31:46 PM5/22/12
to masstrans...@googlegroups.com
You cannot guarantee the order of messages.

You are very close though to what I think is a workable solution. 

In the BOF message include the total number of rows.
send all of the rows

when the rowcount == number of rows received then you can mark your saga complete.

does that make sense?

-d


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/masstransit-discuss?hl=en.


brya...@gmail.com

unread,
May 23, 2012, 8:55:16 AM5/23/12
to masstrans...@googlegroups.com
I'm afraid the row count would not restore the missing rows, so I don't suppose it will work. I figure I'll just have to use two-way messaging so that the saga sends a response once it receives Bof that the publisher can wait for to start sending rows.
-d

To post to this group, send email to masstransit-discuss@googlegroups.com.
To unsubscribe from this group, send email to masstransit-discuss+unsub...@googlegroups.com.

Symon Rottem

unread,
May 23, 2012, 9:07:37 AM5/23/12
to masstrans...@googlegroups.com
The way you originally planned on doing it required that all the file part messages arrive in order which can't be guaranteed so you'll have to find a workaround.

You could make any file part message start a saga (rather than the BOF message) but only end the saga when all the parts have been received.  Each part would need to have a way of looking up which saga it belongs (a correlator) to so you'd need a unique identifier of some sort in each message. It wouldn't matter if the EOF arrived first and the BOF last since all messages would belong to the same saga and the *first received* started it. 

You'd also need a counter that would indicate the total number of messages expected so the saga knows when it's time to finish.  If the total number of received messages is less than the expected total then saga continues until they do.  

Also, if order is  important then you'd need to have some way of organizing the parts once they arrive such as a sequence number for each part.

Just food for thought.

Cheers,

Symon.

To view this discussion on the web visit https://groups.google.com/d/msg/masstransit-discuss/-/abxtHZ3PK8MJ.
To post to this group, send email to masstrans...@googlegroups.com.
To unsubscribe from this group, send email to masstransit-dis...@googlegroups.com.

Dru Sellers

unread,
May 23, 2012, 9:19:27 AM5/23/12
to masstrans...@googlegroups.com
you could send a BOF that has the line count.
each line message could contain its 'order #'
the saga will complete when it get's all lines
then have a timeout on the saga that if it doesn't get all of the lines it can request the missing ones

-d
Reply all
Reply to author
Forward
0 new messages