Outbox behaviour for "local" messages

183 views
Skip to first unread message

Tim Duncan

unread,
Dec 15, 2015, 7:03:25 PM12/15/15
to particula...@googlegroups.com
Helpful information to include
Product name: 
  NServiceBus.SqlServer
Version:
  2.2.1
Stacktrace:

12/15/2015 13:16:56 [Information] Failed to process message

NHibernate.Exceptions.GenericADOException: could not execute update query[SQL:

update OutboxRecord set Dispatched=1, DispatchedAt=? where (MessageId in (? , ?)

and Dispatched=0] ---> System.Data.SqlClient.SqlException: Timeout expired.  The timeout period elapsed prior to completion of the operation or the server is

not responding.

The statement has been terminated. ---> System.ComponentModel.Win32Exception: The wait operation timed out


Description:

Recently upgraded to NSB 5.2.12 from 4.6 and trying to find the best way to implement Idempotency...

I am not understanding something about how Outbox works or should work.

All of our endpoints currently share the same SQL Server database (transport, persistence and business data).We have enabled Outbox as well - even though I don't believe it is needed, I also wasn't expecting it to cause any trouble.

We have a set of endpoints that receive commands from a WebAPI layer. The WebAPI layer is called by client apps using REST when they want to interact with a device: such as ActivateDevice, ConfigureDevice or DeactivateDevice commands. Each command implements the IRequest interface with a single RequestId property. RequestId is unique per desired user action.

Because we are using HTTP/REST, we expect the requests will arrive AT LEAST ONCE, but possibly more times - as many times as it takes for the client app to get positive acknowledgment of message being received. We have each endpoint deduplicate the commands based on RequestId by storing the RequestId in the same database as the messages. This is done by having a handler of type IdempotentRequestHandler : IHandlerMessages<IRequest> that is set to order.SpecifyFirst<IdempotentRequestHandler> in each of the endpoints. The handler uses the injected NHibernate ISession to try to insert the RequestId and then does one of two things:
1. If a primary key violation exception is thrown it calls Bus.DoNotContinueDispatchingCurrentMessageToHandlers(). No further processing occurs for this command.
2. The RequestId gets inserted successfully. Processing then continues on to the intended handler of the command.

The problem occurs when processing one of the commands for the first time. IdempotentRequestHandler runs and inserts the RequestId into the table. The command, let's say ActivateDevice, is handled, some work is done and some messages are sent to other endpoints (remembering that the queues are all in the same database). After the command handling is complete and a number of seconds pass, the exception occurs (the stacktrace at the top of this post) and the entire transaction is rolled back.

If we disable Outbox and send a command with a new RequestId everything works correctly. What is happening??



ramon...@particular.net

unread,
Dec 17, 2015, 1:37:37 PM12/17/15
to Particular Software


Hi Tim,


The query that you show is executed when the outbox was able to deliver all messages in the outbox for the incoming messages. It wants to mark it as dispatched which it seems it cannot do because the row or table is locked.

Could you please verify the indexes on the OutboxRecord table.

I would also like to know how many rows the table has.

Also, how many seconds does this take? Can you extract this based on the timestamps of the log file?

Regards,
Ramon

ramon...@particular.net

unread,
Dec 17, 2015, 1:41:00 PM12/17/15
to Particular Software

As a side note..

Because your (transactional) system is interacting with a non transaction system you need deduplication which is what the outbox also provides. You have now created your own deduplication tracking but you can use the outbox deduplication by setting the Message ID header. You can set the message id based on your request id value by using an outgoing message mutator. By specifying the message id we can also do deduplication for you if having the outbox enabled.

On Wednesday, December 16, 2015 at 1:03:25 AM UTC+1, Tim Duncan wrote:

Tim Duncan

unread,
Dec 17, 2015, 11:21:33 PM12/17/15
to Particular Software
Hi Ramon.

Logging shows the timing is about 30 seconds (SqlCommand timeout?) between completion of handling the ActivateDevice command and the exception being thrown.

The OutboxRecord table at the time of the exception contains 0 rows.

Here are the table and index definitions:

CREATE TABLE [dbo].[OutboxRecord](
[Id] [bigint] IDENTITY(1,1) NOT NULL,
[MessageId] [nvarchar](255) NOT NULL,
[Dispatched] [bit] NOT NULL,
[DispatchedAt] [datetime] NULL,
[TransportOperations] [nvarchar](max) NULL,
PRIMARY KEY CLUSTERED 
(
[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY],
UNIQUE NONCLUSTERED 
(
[MessageId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

CREATE NONCLUSTERED INDEX [OutboxRecord_Dispatched_Idx] ON [dbo].[OutboxRecord]
(
[Dispatched] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]

CREATE NONCLUSTERED INDEX [OutboxRecord_DispatchedAt_Idx] ON [dbo].[OutboxRecord]
(
[DispatchedAt] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]

Tim Duncan

unread,
Dec 17, 2015, 11:22:33 PM12/17/15
to Particular Software
Ramon, 

Background: this is an upgrade from 4.x to 5.x so we wanted to change as little code as possible in the first pass. With that in mind we already had a deduplication solution using handlers.

Now, just to clarify your suggestion wrt to Outbox:

- I should create a message mutator for the WebAPI sendonly endpoint to set the MessageID header to the same value as the RequestId from the incoming REST call. 
- Outbox will de-duplicate the messages when they arrive at the processing endpoint having the same MessageID as a previous message.

If my understanding of this is correct, we might try this at a later date.

Regards,
Tim.

ramon...@particular.net

unread,
Dec 18, 2015, 1:05:16 PM12/18/15
to Particular Software

Hi Tim,

The indexes are correct. The update query shouldn't have issues updating that single record.

Which version are you using of NServiceBus.NHibernate?

Is it possible to share the log file with log level set to DEBUG? If it contains sensitive information then you can also send it to me via sup...@particular.net 

Regards,
Ramon

Tim Duncan

unread,
Dec 20, 2015, 6:54:10 PM12/20/15
to Particular Software
Ramon, 

NServiceBus.NHibernate version 6.2.5 (with NHibernate version 4.0.1.4000)

We did some more investigations over the weekend...and identified that the endpoint still had Distributed Transactions enabled. When we included busConfiguration.Transactions().DisableDistributedTransactions() the transactions we able to complete and the Timeout expired exception goes away.

However, a new issue has now appeared that seems to be related to Auditing. 
The endpoints run on the same machine which also hosts the database - called BTG-TPC2. 
We have UseSingleBrokerQueue enabled and CallbackReceiver disabled.
The command to ActivateDevice is processed by the ActivateDeviceSaga that sends commands to a DeviceProvisioner endpoint and receives back Bus.Reply() responses. The ActivateDeviceSaga receives the responses, processes them successfully. An exception occurs immediately after and the endpoint shuts itself down after exhausting FLRs. 

If we turn  Audit off, everything again works correctly - no exception is raised.

Here is the config:

configuration.EnableInstallers();
configuration.EndpointName("DeviceActivator");
configuration.ScaleOut().UseSingleBrokerQueue();
configuration.Transactions().DisableDistributedTransactions();
configuration.Transactions().DefaultTimeout(TimeSpan.FromSeconds(60));
configuration.UsePersistence<NHibernatePersistence>().RegisterManagedSessionInTheContainer();
configuration.UseTransport<SqlServerTransport>().DisableCallbackReceiver();
configuration.EnableFeature<SecondLevelRetries>();
configuration.EnableFeature<Sagas>();
configuration.EnableFeature<Audit>();
configuration.DisableFeature<AutoSubscribe>();


Here is the stack trace:

2015-12-21 09:16:41.719 +11:00 [Information] Failed to process message

NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to address: [DeviceProvisioner .BTG-TPC2] ---> System.Data.SqlClient.SqlException: Invalid object name 'DeviceProvisioner.BTG-TPC2'.

   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)

   at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)

   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)

   at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)

   at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString)

   at System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async, Int32 timeout, Task& task, Boolean asyncWrite, SqlDataReader ds)

   at System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, TaskCompletionSource`1 completion, Int32 timeout, Task& task, Boolean asyncWrite)

   at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(TaskCompletionSource`1 completion, String methodName, Boolean sendToPipe, Int32 timeout, Boolean asyncWrite)

   at System.Data.SqlClient.SqlCommand.ExecuteNonQuery()

   at NServiceBus.Transports.SQLServer.SqlServerMessageSender.ExecuteQuery(TransportMessage message, SqlCommand command, SendOptions sendOptions) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerMessageSender.cs:line 185

   at NServiceBus.Transports.SQLServer.SqlServerMessageSender.Send(TransportMessage message, SendOptions sendOptions) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerMessageSender.cs:line 68

   --- End of inner exception stack trace ---

   at NServiceBus.Transports.SQLServer.SqlServerMessageSender.Send(TransportMessage message, SendOptions sendOptions) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerMessageSender.cs:line 112

   at NServiceBus.Transports.DefaultMessageAuditer.Audit(SendOptions sendOptions, TransportMessage transportMessage) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Audit\DefaultMessageAuditer.cs:line 55

   at CallSite.Target(Closure , CallSite , Object , SendOptions , TransportMessage )

   at NServiceBus.Transports.AuditerWrapper.Audit(SendOptions sendOptions, TransportMessage message) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Audit\DefaultMessageAuditer.cs:line 21

   at NServiceBus.AuditBehavior.Invoke(IncomingContext context, Action next) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Audit\AuditBehavior.cs:line 28

   at NServiceBus.BehaviorChain`1.InvokeNext(T context) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 107

   at NServiceBus.BehaviorChain`1.Invoke() in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Pipeline\BehaviorChain.cs:line 52

   at NServiceBus.Pipeline.PipelineExecutor.Execute[T](BehaviorChain`1 pipelineAction, T context) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Pipeline\PipelineExecutor.cs:line 129

   at NServiceBus.Pipeline.PipelineExecutor.InvokePipeline[TContext](IEnumerable`1 behaviors, TContext context) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Pipeline\PipelineExecutor.cs:line 85

   at NServiceBus.Pipeline.PipelineExecutor.InvokeReceivePhysicalMessagePipeline() in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Pipeline\PipelineExecutor.cs:line 102

   at NServiceBus.Unicast.UnicastBus.TransportMessageReceived(Object sender, TransportMessageReceivedEventArgs e) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Unicast\UnicastBus.cs:line 826

   at NServiceBus.Unicast.Transport.TransportReceiver.OnTransportMessageReceived(TransportMessage msg) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Unicast\Transport\TransportReceiver.cs:line 411

   at NServiceBus.Unicast.Transport.TransportReceiver.ProcessMessage(TransportMessage message) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Unicast\Transport\TransportReceiver.cs:line 344

   at NServiceBus.Unicast.Transport.TransportReceiver.TryProcess(TransportMessage message) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Unicast\Transport\TransportReceiver.cs:line 228

   at NServiceBus.Transports.SQLServer.SqlServerPollingDequeueStrategy.TryReceiveWithNativeTransaction(String sql) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerPollingDequeueStrategy.cs:line 357


And...

2015-12-21 09:16:41.996 +11:00 [Information] Initiating shutdown.
2015-12-21 09:16:42.013 +11:00 [Fatal] Could not forward failed message to error queue 'DeviceProvisioner.BTG-TPC2' as it could not be found.
2015-12-21 09:16:42.014 +11:00 [Fatal] Fault manager failed to process the failed message with id 9ddac1ee-ee5a-4c0c-8aa4-a5750098e60e
System.InvalidOperationException: Could not forward failed message to error queue 'DeviceProvisioner.BTG-TPC2' as it could not be found. ---> NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to address: [DeviceProvisioner.BTG-TPC2] ---> System.Data.SqlClient.SqlException: Invalid object name 'DeviceProvisioner.BTG-TPC2'.
   at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
   at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
   at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString)
   at System.Data.SqlClient.SqlCommand.RunExecuteReaderTds(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, Boolean async, Int32 timeout, Task& task, Boolean asyncWrite, SqlDataReader ds)
   at System.Data.SqlClient.SqlCommand.RunExecuteReader(CommandBehavior cmdBehavior, RunBehavior runBehavior, Boolean returnStream, String method, TaskCompletionSource`1 completion, Int32 timeout, Task& task, Boolean asyncWrite)
   at System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(TaskCompletionSource`1 completion, String methodName, Boolean sendToPipe, Int32 timeout, Boolean asyncWrite)
   at System.Data.SqlClient.SqlCommand.ExecuteNonQuery()
   at NServiceBus.Transports.SQLServer.SqlServerMessageSender.ExecuteQuery(TransportMessage message, SqlCommand command, SendOptions sendOptions) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerMessageSender.cs:line 185
   at NServiceBus.Transports.SQLServer.SqlServerMessageSender.Send(TransportMessage message, SendOptions sendOptions) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerMessageSender.cs:line 68
   --- End of inner exception stack trace ---
   at NServiceBus.Transports.SQLServer.SqlServerMessageSender.Send(TransportMessage message, SendOptions sendOptions) in c:\BuildAgent\work\c978930f5d9537fd\src\NServiceBus.SqlServer\SqlServerMessageSender.cs:line 112
   at NServiceBus.Faults.Forwarder.FaultManager.HandleProcessingAlwaysFailsForMessage(TransportMessage message, Exception e, Int32 numberOfRetries) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Faults\Forwarder\FaultManager.cs:line 88
   at NServiceBus.Faults.Forwarder.FaultManager.<>c__DisplayClass14_0.<NServiceBus.Faults.IManageMessageFailures.ProcessingAlwaysFailsForMessage>b__0() in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Faults\Forwarder\FaultManager.cs:line 43
   at NServiceBus.Faults.Forwarder.FaultManager.TryHandleFailure(Action failureHandlingAction) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Faults\Forwarder\FaultManager.cs:line 112
   --- End of inner exception stack trace ---
   at NServiceBus.Faults.Forwarder.FaultManager.TryHandleFailure(Action failureHandlingAction) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Faults\Forwarder\FaultManager.cs:line 118
   at NServiceBus.Faults.Forwarder.FaultManager.NServiceBus.Faults.IManageMessageFailures.ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Faults\Forwarder\FaultManager.cs:line 43
   at NServiceBus.Unicast.Transport.FirstLevelRetries.TryInvokeFaultManager(TransportMessage message, Exception exception, Int32 numberOfAttempts) in C:\BuildAgent\work\3206e2123f54fce4\src\NServiceBus.Core\Unicast\Transport\FirstLevelRetries.cs:line 66

Tim Duncan

unread,
Dec 23, 2015, 12:16:58 AM12/23/15
to Particular Software
Just as an update to this...

The exception does not occur if we replace the Bus.Reply() in the DeviceProvisioner endpoint with a Bus.Send().

However, we will have other endpoints sending to the same command to the DeviceProvisioner and they will need to receive the response as well so using Bus.Send() is not a real solution.

ramon...@particular.net

unread,
Dec 23, 2015, 3:38:16 AM12/23/15
to Particular Software

Hi Tim,

I see the following error in your log: "Could not forward failed message to error queue 'DeviceProvisioner.BTG-TPC2' as it could not be found" and I don't completely understand why it would want to send a message to this queue in case of an error.

Are you using sql transport routing because queues are located in different sql server schemas, catalogs or instances?

Can you share your configuration that defines the connection strings, message mappings and error queues for both endpoints?

Are you making use of callbacks via Bus.Send().Register(...)? I don't think so because you explicitly disable called `.DisableCallbackReceiver()`

Are you using a Bus.Reply within your Saga or only from your handler?

In which endpoint is the error occurring? Based on your configuration I would say `DeviceActivator`.




Regards,
Ramon

Tim Duncan

unread,
Dec 23, 2015, 9:06:46 PM12/23/15
to Particular Software
Ramon,

The first Exception appears to generated from within NServiceBus.Transports.DefaultMessageAuditer.Audit() because a message is being sent to a queue named DeviceProvisioner.BTG-TPC2 which doesn't exist. After FLR are exhausted the message is being handed over to the FaultManager which is the second Exception in my post. Why it chooses the DeviceProvisioner.BTG-TPC2 queue as the destination for errored messages is a mystery to me as well.

We are using SQL transport because this is all hosted in Azure. Single instance, single database, single schema.

Config from DeviceActivator endpoint - 

  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="Data Source=XXX;Initial Catalog=UCP;Integrated Security=True" />
    <add name="NServiceBus/Persistence" connectionString="Data Source=XXX;Initial Catalog=UCP;Integrated Security=True" />
  </connectionStrings>
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <AuditConfig QueueName="audit" />
  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Messages="UCP.Messages.Provision.ProvisionDeviceCommand, UCP.Messages" Endpoint="DeviceProvisioner" />
      <add Messages="UCP.Messages.Provision.ProvisionDeviceFeatureCommand, UCP.Messages" Endpoint="FeatureProvisioner" />
      <add Messages="UCP.Messages.Notifications.NotifyConsumerOfActivatedDeviceCommand, UCP.Messages" Endpoint="ConsumerNotifier" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

Config from DeviceProvisioner endpoint - 

  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="Data Source=XXX;Initial Catalog=UCP;Integrated Security=True" />
    <add name="NServiceBus/Persistence" connectionString="Data Source=XXX;Initial Catalog=UCP;Integrated Security=True" />
  </connectionStrings>
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <AuditConfig QueueName="audit" />
  <UnicastBusConfig>
    <MessageEndpointMappings></MessageEndpointMappings>
  </UnicastBusConfig>


Definitely no callbacks in our code. Explicitly calling DisableCallbackReceiver() so that we don't get .<machine-name> queues.

Bus.Reply() only from the handler hosted in the DeviceProvisioner endpoint, NOT the Saga which is hosted in the DeviceActivator endpoint.

The error occurs in the DeviceActivator (the Saga endpoint) after it successfully processes the response message sent from the DeviceProvisioner (the handler endpoint doing the work).

ramon...@particular.net

unread,
Dec 30, 2015, 6:24:32 PM12/30/15
to Particular Software


Hi Tim,

After some experimenting, I was able to reproduce your issue. I created two endpoints both using the same database and connection string for both the transport as the persistence.

One endpoints calls "DisableCallbackReceiver()" and the other does not and I tweaked the connection strings for each instance which made the configurations purposely out of sync to get endpoints mis configured about table locations.

I was not able to reproduce your issue where you get this exception when enabling auditing.

Could you please verify that configurations between endpoints are in sync?


Regards,
Ramon
Reply all
Reply to author
Forward
0 new messages