Jason Meckley
unread,Nov 2, 2011, 10:47:31 AM11/2/11Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to rhino-t...@googlegroups.com
I have a service that will automatically transfer data from one db to another. I'm using Rhino.ETL for the data transfer. I'm using RSB with RQ as the mechanism to keep the ETL running. conceptually it works likes this.
1. install the app as a service using RSB Hosting. during install send a message to run the ETL process.
2. start sevice, which will consume the message to run the ETL Process.
3. consuming the message will
3.1. run the ETL process
3.2. check for additional transfers that may have arrived
3.2.1. If another transfer exists, immediately send a message to run the ETL process.
3.2.2. if no additional transfers are pending, send a message in 10 minutes to run the ETL process.
as part of the ETL operations I implemented my own ConventionInput/OutputCommands to not use transactions and instead rely on the SystemTransaction opened by RhinoQueues.
I also implemented a message module to verify the databases are available.
public class VerifyDatabaseAvailability
: WithLoggingMixin
, IMessageModule
{
private IServiceBus theBus;
public Action TestDatabaseAvailability { get; set; }
public VerifyDatabaseAvailability()
{
TestDatabaseAvailability = () =>
{
using (Use.Connection("source"))
using (Use.Connection("destination"))
{
}
};
}
public void Init(ITransport transport, IServiceBus bus)
{
theBus = bus;
transport.MessageArrived += CanConnectToDatabase;
}
public void Stop(ITransport transport, IServiceBus bus)
{
transport.MessageArrived -= CanConnectToDatabase;
}
private bool CanConnectToDatabase(CurrentMessageInformation arg)
{
try
{
TestDatabaseAvailability();
return false;
}
catch
{
var oneHourFromNow = SystemTime.Now().AddHours(1);
theBus.DelaySend(oneHourFromNow, arg.AllMessages);
Info("Databases are not available. Will try again on {0:F}", oneHourFromNow);
return true;
}
}
}
this handles the situations where either database may be offline for maintenance. I know this is working because I see evidence of this in the logs when we take the databases offline. However there are some instances where a TransactionAbortedException or SqlException ('Timeout expired' or 'A transport-level error') is thrown. When this occurs RSB makes 3 attempts to consume the message, which is the default behavior. The message cannot be consumed due to the exception and is moved to the error queue. Once in the error queue the system will stop receiving messages to run the ETL process. The service is running, but there are not messages being sent.
TransactionAbortedException throw within the message module above
Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport [(null)] - Failed to process message
System.Transactions.TransactionAbortedException: The transaction has aborted.
at System.Transactions.TransactionStatePromotedAborted.CreateAbortingClone(InternalTransaction tx)
at System.Transactions.DependentTransaction..ctor(IsolationLevel isoLevel, InternalTransaction internalTransaction, Boolean blocking)
at System.Transactions.Transaction.DependentClone(DependentCloneOption cloneOption)
at System.Transactions.TransactionScope.SetCurrent(Transaction newCurrent)
at System.Transactions.TransactionScope.PushScope()
at System.Transactions.TransactionScope..ctor(TransactionScopeOption scopeOption, TransactionOptions transactionOptions)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.SendInternal(Object[] msgs, Endpoint destination, Action`1 customizeHeaders)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.Send(Endpoint endpoint, DateTime processAgainAt, Object[] msgs)
at Rhino.ServiceBus.Impl.DefaultServiceBus.DelaySend(DateTime time, Object[] msgs)
at MyProject.VerifyDatabaseAvailability.CanConnectToDatabase(CurrentMessageInformation arg)
at Rhino.ServiceBus.Transport.TransportUtil.ProcessSingleMessage(CurrentMessageInformation currentMessageInformation, Func`2 messageRecieved)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.ProcessMessage(Message message, TransactionScope tx, Func`2 messageRecieved, Action`2 messageCompleted, Action`1 beforeTransactionCommit)
And Sql exceptions occur within the ETL process
Rhino.Etl.Core.Pipelines.SingleThreadedPipelineExecuter [(null)] - Failed to execute operation
System.Data.SqlClient.SqlException (0x80131904): A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.)
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection)
at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning()
at System.Data.SqlClient.TdsParserStateObject.ReadSniError(TdsParserStateObject stateObj, UInt32 error)
at System.Data.SqlClient.TdsParserStateObject.ReadSni(DbAsyncResult asyncResult, TdsParserStateObject stateObj)
at System.Data.SqlClient.TdsParserStateObject.ReadNetworkPacket()
at System.Data.SqlClient.TdsParserStateObject.ReadBuffer()
at System.Data.SqlClient.TdsParserStateObject.ReadByte()
at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
at System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String methodName, Boolean async)
at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult result, String methodName, Boolean sendToPipe)
at System.Data.SqlClient.SqlCommand.ExecuteNonQuery()
at MyProject.DatabaseExtensions.SetArithabortOn(IDbConnection connection)
at MyProject.MyOutputOperation.<Execute>d__0.MoveNext()
at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
at Rhino.Etl.Core.Pipelines.AbstractPipelineExecuter.ExecutePipeline(IEnumerable`1 pipeline)
I receive logs that inform me of the errors, I then run a reboot script which re-installs the service. It's a brute force approach, but it gets the service up and running.
I would like to add better error handling to account for the TransactionAbortedException and SqlException and tell RSB to send a message in 1 hour to try again. Conceptually I want to continue to send messages at a later time even after the message completely fails.
How can I handle this scenario with RSB? Or should I remove ESB and handle the continued running differently?