RSB: How can I gracely fully handle exceptions

156 views
Skip to first unread message

Jason Meckley

unread,
Nov 2, 2011, 10:47:31 AM11/2/11
to rhino-t...@googlegroups.com
I have a service that will automatically transfer data from one db to another. I'm using Rhino.ETL for the data transfer. I'm using RSB with RQ as the mechanism to keep the ETL running. conceptually it works likes this.

1. install the app as a service using RSB Hosting. during install send a message to run the ETL process.
2. start sevice, which will consume the message to run the ETL Process.
3. consuming the message will
    3.1. run the ETL process
    3.2. check for additional transfers that may have arrived
           3.2.1. If another transfer exists, immediately send a message to run the ETL process.
           3.2.2. if no additional transfers are pending, send a message in 10 minutes to run the ETL process.

as part of the ETL operations I implemented my own ConventionInput/OutputCommands to not use transactions and instead rely on the SystemTransaction opened by RhinoQueues.

I also implemented a message module to verify the databases are available.
public class VerifyDatabaseAvailability
    : WithLoggingMixin
      , IMessageModule
{
    private IServiceBus theBus;

    public Action TestDatabaseAvailability { get; set; }

    public VerifyDatabaseAvailability()
    {
        TestDatabaseAvailability = () =>
                                       {
                                           using (Use.Connection("source"))
                                           using (Use.Connection("destination"))
                                           {
                                           }
                                       };
    }

    public void Init(ITransport transport, IServiceBus bus)
    {
        theBus = bus;
        transport.MessageArrived += CanConnectToDatabase;
    }

    public void Stop(ITransport transport, IServiceBus bus)
    {
        transport.MessageArrived -= CanConnectToDatabase;
    }

    private bool CanConnectToDatabase(CurrentMessageInformation arg)
    {
        try
        {
            TestDatabaseAvailability();
            return false;
        }
        catch
        {
            var oneHourFromNow = SystemTime.Now().AddHours(1);
            theBus.DelaySend(oneHourFromNow, arg.AllMessages);

            Info("Databases are not available. Will try again on {0:F}", oneHourFromNow);

            return true;
        }
    }
}

this handles the situations where either database may be offline for maintenance. I know this is working because I see evidence of this in the logs when we take the databases offline. However there are some instances where a TransactionAbortedException or SqlException ('Timeout expired' or 'A transport-level error') is thrown. When this occurs RSB makes 3 attempts to consume the message, which is the default behavior. The message cannot be consumed due to the exception and is moved to the error queue. Once in the error queue the system will stop receiving messages to run the ETL process. The service is running, but there are not messages being sent.

TransactionAbortedException throw within the message module above
Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport [(null)] - Failed to process message
System.Transactions.TransactionAbortedException: The transaction has aborted.
at System.Transactions.TransactionStatePromotedAborted.CreateAbortingClone(InternalTransaction tx)
at System.Transactions.DependentTransaction..ctor(IsolationLevel isoLevel, InternalTransaction internalTransaction, Boolean blocking)
at System.Transactions.Transaction.DependentClone(DependentCloneOption cloneOption)
at System.Transactions.TransactionScope.SetCurrent(Transaction newCurrent)
at System.Transactions.TransactionScope.PushScope()
at System.Transactions.TransactionScope..ctor(TransactionScopeOption scopeOption, TransactionOptions transactionOptions)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.SendInternal(Object[] msgs, Endpoint destination, Action`1 customizeHeaders)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.Send(Endpoint endpoint, DateTime processAgainAt, Object[] msgs)
at Rhino.ServiceBus.Impl.DefaultServiceBus.DelaySend(DateTime time, Object[] msgs)
at MyProject.VerifyDatabaseAvailability.CanConnectToDatabase(CurrentMessageInformation arg)
at Rhino.ServiceBus.Transport.TransportUtil.ProcessSingleMessage(CurrentMessageInformation currentMessageInformation, Func`2 messageRecieved)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.ProcessMessage(Message message, TransactionScope tx, Func`2 messageRecieved, Action`2 messageCompleted, Action`1 beforeTransactionCommit)

And Sql exceptions occur within the ETL process
Rhino.Etl.Core.Pipelines.SingleThreadedPipelineExecuter [(null)] - Failed to execute operation
System.Data.SqlClient.SqlException (0x80131904): A transport-level error has occurred when receiving results from the server. (provider: TCP Provider, error: 0 - The specified network name is no longer available.)
   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection)
   at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning()
   at System.Data.SqlClient.TdsParserStateObject.ReadSniError(TdsParserStateObject stateObj, UInt32 error)
   at System.Data.SqlClient.TdsParserStateObject.ReadSni(DbAsyncResult asyncResult, TdsParserStateObject stateObj)
   at System.Data.SqlClient.TdsParserStateObject.ReadNetworkPacket()
   at System.Data.SqlClient.TdsParserStateObject.ReadBuffer()
   at System.Data.SqlClient.TdsParserStateObject.ReadByte()
   at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
   at System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String methodName, Boolean async)
   at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult result, String methodName, Boolean sendToPipe)
   at System.Data.SqlClient.SqlCommand.ExecuteNonQuery()
   at MyProject.DatabaseExtensions.SetArithabortOn(IDbConnection connection)
   at MyProject.MyOutputOperation.<Execute>d__0.MoveNext()
   at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
   at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
   at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
   at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
   at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
   at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
   at Rhino.Etl.Core.Pipelines.AbstractPipelineExecuter.ExecutePipeline(IEnumerable`1 pipeline)

I receive logs that inform me of the errors, I then run a reboot script which re-installs the service. It's a brute force approach, but it gets the service up and running.

I would like to add better error handling to account for the TransactionAbortedException and SqlException and tell RSB to send a message in 1 hour to try again. Conceptually I want to continue to send messages at a later time even after the message completely fails.

How can I handle this scenario with RSB? Or should I remove ESB and handle the continued running differently?

Jason Meckley

unread,
Nov 7, 2011, 1:48:26 PM11/7/11
to rhino-t...@googlegroups.com
I'm thinking of tying into the existing error logging. when a message is moved to error queue i want to send another message to try again in 1 hour. I thought the exception handling was implemented as a message handler, but I'm not seeing this in the source code.

where is the exception handling implemented?

Kaylor Mail

unread,
Nov 7, 2011, 10:34:29 PM11/7/11
to rhino-t...@googlegroups.com
It is an implementation of a message module.


On Nov 7, 2011, at 11:48 AM, Jason Meckley <jasonm...@gmail.com> wrote:

I'm thinking of tying into the existing error logging. when a message is moved to error queue i want to send another message to try again in 1 hour. I thought the exception handling was implemented as a message handler, but I'm not seeing this in the source code.

where is the exception handling implemented?

--
You received this message because you are subscribed to the Google Groups "Rhino Tools Dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/rhino-tools-dev/-/0AH8q6-dxzQJ.
To post to this group, send email to rhino-t...@googlegroups.com.
To unsubscribe from this group, send email to rhino-tools-d...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/rhino-tools-dev?hl=en.
Reply all
Reply to author
Forward
0 new messages