brya...@gmail.com
unread,May 22, 2012, 2:15:10 PM5/22/12Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to masstransit-discuss
Hello all!
I have a saga that has 3 states; Initial, ReceivingRows, Completed -
public static State Initial { get; set; }
public static State ReceivingRows { get; set; }
public static State Completed { get; set; }
It transitions from Initial to ReceivingRows when it gets a BofMessage
(where Bof = Beginning of file). After the BofMessage, it receives a
large number of RowMessages where each describes a row in a flat file.
Once all RowMessages are sent, an EofMessage is sent and the state
changes to Completed. Observe -
static void DefineSagaBehavior()
{
Initially(When(ReceivedBof)
.Then((saga, message) => saga.BeginFile(message))
.TransitionTo(ReceivingRows));
During(ReceivingRows, When(ReceivedRow)
.Then((saga, message) => saga.AddRow(message)));
During(ReceivingRows, When(ReceivedRowError)
.Then((saga, message) => saga.RowError(message)));
During(ReceivingRows, When(ReceivedEof)
.Then((saga, message) => saga.EndFile(message))
.TransitionTo(Completed));
}
This works, except sometimes several RowMessages are received _before_
the BofMessage! This is regardless of the order that I sent them. This
means that the messages will be received and ultimately counted as
errors, causing them to be missing from the database or file that I
finally write them out to.
As a temporary fix, I add a little sleep timer hack in this method
that does all the publishing –
public static void Publish(
[NotNull] IServiceBus serviceBus,
[NotNull] string publisherName,
Guid correlationId,
[NotNull] Tuple<string, string> inputFileDescriptor,
[NotNull] string outputFileName)
{
// attempt to load offsets
var offsetsResult =
OffsetParser.Parse(inputFileDescriptor.Item1);
if (offsetsResult.Result != ParseOffsetsResult.Success)
{
// publish an offsets invalid message
serviceBus.Publish<TErrorMessage>(CombGuid.Generate(),
publisherName, inputFileDescriptor.Item2);
return;
}
// publish beginning of file
var fullInputFilePath =
Path.GetFullPath(inputFileDescriptor.Item2);
serviceBus.Publish<TBofMessage>(correlationId,
publisherName, fullInputFilePath);
// HACK: make sure bof message happens before row
messages, or else some row messages won't be received
Thread.Sleep(5000);
// publish rows from feed
var feedResult =
FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
foreach (var row in feedResult)
{
// publish row message, unaligned if applicable
if (row.Result != ParseRowResult.Success)
serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName,
row.Fields);
else
serviceBus.Publish<TRowMessage>(correlationId,
publisherName, row.Fields);
}
// publish end of file
serviceBus.Publish<TEofMessage>(correlationId,
publisherName, outputFileName);
}
It’s a 5 second sleep-timer, and is quite an ugly hack. Can anyone
inform me why I’m not getting the messages in the order I send them?
Can I ensure that these message get sent in the right order if they
are unordered by default?
Thank you!