I'm not sure if that's the right way to go, but we solved this by not calling 'UseControlBus()', I think it might be MSMQ specific
On Tuesday, June 12, 2012 2:21:12 PM UTC+2, Erik Rydgren wrote:
Hi!
I'm developing a system that uses RabbitMQ and persistant queues. I have one subsystem that generates messages and an other system that consumes those messages into sagas.
All of this works as a charm as long as both subsystems are up and running, but if I take the consumer offline and let the queue fill up with pending messages and then restart the consumer I get a weird behavior.
It looks like the InboundRabbitMqTransport is starting to process messages before all defined consumers are registered. This makes the system drop messages and that is something we just can't afford.
Below is part of the log during startup, note the three little nasty SKIP messages (at 2012-06-12 09:40:56,073) in the middle of the subscription setup. Those messages never reach a consumer and are lost forever (not even on error queue).
Is this normal behavior or have I missed something during setup (snip of the setup below). If this is normal is there a way for me to defer processing of messages until ALL consumers are registered?
Best regards
Erik
******************************* Startup log ********************************************
2012-06-12 09:31:15,184 INFO [11 ] AsiPort.Service.UploadListener.UploadListenerService - Starting Upload Listener Service
2012-06-12 09:40:48,367 DEBUG [12 ] MassTransit.ConsumerSubscriptionExtensions - Subscribing Consumer: AsiPort.Service.UploadListener.FileCheckProcessor (using supplied consumer factory)
2012-06-12 09:40:48,421 DEBUG [12 ] MassTransit.ConsumerSubscriptionExtensions - Subscribing Consumer: AsiPort.Service.UploadListener.FileConvertionProcessor (using supplied consumer factory)
2012-06-12 09:40:48,422 DEBUG [12 ] MassTransit.ConsumerSubscriptionExtensions - Subscribing Consumer: AsiPort.Service.UploadListener.TempFileProcessor (using supplied consumer factory)
2012-06-12 09:40:48,423 DEBUG [12 ] MassTransit.ConsumerSubscriptionExtensions - Subscribing Consumer: AsiPort.Service.UploadListener.RemoveFileHandler (using supplied consumer factory)
2012-06-12 09:40:48,424 DEBUG [12 ] MassTransit.ConsumerSubscriptionExtensions - Subscribing Consumer: AsiPort.Service.UploadListener.FileShareProcessor (using supplied consumer factory)
2012-06-12 09:40:51,218 WARN [12 ] NHibernate.Type.CustomType - the custom type 'Magnum.StateMachine.State' handled by 'MassTransit.NHibernateIntegration.StateMachineUserType' is not Serializable:
2012-06-12 09:40:52,185 DEBUG [12 ] MassTransit.SagaSubscriptionConfiguratorExtensions - Subscribing Saga: AsiPort.Service.UploadListener.FileUploadSaga
2012-06-12 09:40:52,216 INFO [12 ] MassTransit.BusConfigurators.ServiceBusConfiguratorImpl - MassTransit v2.0.1.2, .NET Framework v4.0.30319.269
2012-06-12 09:40:52,240 DEBUG [12 ] MassTransit.BusConfigurators.ControlBusConfiguratorImpl - Configuring control bus for rabbitmq://localhost/AsiPort_UploadListener at rabbitmq://localhost/AsiPort_UploadListener_control
2012-06-12 09:40:52,242 DEBUG [12 ] MassTransit.Builders.ControlBusBuilderImpl - Creating ControlBus at rabbitmq://localhost/AsiPort_UploadListener_control
2012-06-12 09:40:54,924 DEBUG [12 ] MassTransit.Threading.ThreadPoolConsumerPool - Starting Consumer Pool for rabbitmq://localhost/AsiPort_UploadListener_control
2012-06-12 09:40:54,976 DEBUG [12 ] MassTransit.ServiceContainer - Starting bus service: MassTransit.Subscriptions.Coordinator.SubscriptionRouterService
2012-06-12 09:40:55,457 DEBUG [12 ] MassTransit.Threading.ThreadPoolConsumerPool - Starting Consumer Pool for rabbitmq://localhost/AsiPort_UploadListener
2012-06-12 09:40:55,458 DEBUG [12 ] MassTransit.ServiceContainer - Starting bus service: MassTransit.Diagnostics.MessageTraceBusService
2012-06-12 09:40:55,621 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: MassTransit.Diagnostics.GetMessageTraceList, MassTransit, 6385024e-602f-4f8b-8272-a06e00c083ef
2012-06-12 09:40:55,625 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: MassTransit.Diagnostics.GetMessageTraceList, MassTransit, 6385024e-602f-4f8b-8272-a06e00c083ef
2012-06-12 09:40:55,628 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: MassTransit.Diagnostics.GetMessageTraceList, MassTransit, a477d3bb-627a-473e-873e-a06e00c083f1
2012-06-12 09:40:55,658 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.EndpointSubscriptionConnector - Adding subscription for GetMessageTraceList on rabbitmq://localhost/AsiPort_UploadListener_control to rabbitmq://localhost/AsiPort_UploadListener_control
2012-06-12 09:40:55,705 INFO [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionConnector - Added: MassTransit.Diagnostics.GetMessageTraceList, MassTransit => rabbitmq://localhost/AsiPort_UploadListener_control, a477d3bb-627a-473e-873e-a06e00c083f1
2012-06-12 09:40:55,708 DEBUG [12 ] MassTransit.ServiceContainer - Starting bus service: MassTransit.Subscriptions.Coordinator.SubscriptionRouterService
2012-06-12 09:40:55,709 DEBUG [12 ] MassTransit.ServiceContainer - Starting bus service: MassTransit.Subscriptions.SubscriptionBusService
2012-06-12 09:40:55,722 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.FileToBeVerifiedMessage, AsiPort.Messages.Upload, 4a99e663-b5ba-4f3d-9b76-a06e00c0840e
2012-06-12 09:40:55,723 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.FileToBeVerifiedMessage, AsiPort.Messages.Upload, 4a99e663-b5ba-4f3d-9b76-a06e00c0840e
2012-06-12 09:40:55,725 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.FileToBeVerifiedMessage, AsiPort.Messages.Upload, 59434d95-b7ad-48fb-b9c4-a06e00c0840f
2012-06-12 09:40:55,780 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Service.UploadListener.IFileConvertMessage, AsiPort.Service.UploadListener, dc5ddeed-a702-45d0-af28-a06e00c0841f
2012-06-12 09:40:55,782 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Service.UploadListener.IFileConvertMessage, AsiPort.Service.UploadListener, dc5ddeed-a702-45d0-af28-a06e00c0841f
2012-06-12 09:40:55,783 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Service.UploadListener.IFileConvertMessage, AsiPort.Service.UploadListener, 8c114f93-b310-4d5f-a251-a06e00c08420
2012-06-12 09:40:56,010 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.TempFileReadyMessage, AsiPort.Messages.Upload, 20010353-b6c5-4399-b256-a06e00c08464
2012-06-12 09:40:56,020 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.TempFileReadyMessage, AsiPort.Messages.Upload, 20010353-b6c5-4399-b256-a06e00c08464
2012-06-12 09:40:56,021 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.TempFileReadyMessage, AsiPort.Messages.Upload, 5f2399d7-5837-4e20-84fb-a06e00c08467
2012-06-12 09:40:56,063 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.RemoveFileCommand, AsiPort.Messages.Upload, 7d062d02-197f-4733-a4ec-a06e00c08474
2012-06-12 09:40:56,065 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.RemoveFileCommand, AsiPort.Messages.Upload, 7d062d02-197f-4733-a4ec-a06e00c08474
2012-06-12 09:40:56,066 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.RemoveFileCommand, AsiPort.Messages.Upload, b401d018-4522-4893-9f48-a06e00c08475
2012-06-12 09:40:56,073 DEBUG [15 ] MassTransit.Transports.RabbitMq.InboundRabbitMqTransport - SKIP:rabbitmq://localhost/AsiPort_UploadListener:
2012-06-12 09:40:56,112 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.ShareFileMessage, AsiPort.Messages.Upload, b395f0d1-a252-4ea2-892e-a06e00c08483
2012-06-12 09:40:56,114 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.ShareFileMessage, AsiPort.Messages.Upload, b395f0d1-a252-4ea2-892e-a06e00c08483
2012-06-12 09:40:56,115 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.ShareFileMessage, AsiPort.Messages.Upload, b6b56ecc-f944-4221-b3d4-a06e00c08484
2012-06-12 09:40:56,120 DEBUG [16 ] MassTransit.Transports.RabbitMq.InboundRabbitMqTransport - SKIP:rabbitmq://localhost/AsiPort_UploadListener:
2012-06-12 09:40:56,122 DEBUG [16 ] MassTransit.Transports.RabbitMq.InboundRabbitMqTransport - SKIP:rabbitmq://localhost/AsiPort_UploadListener:
2012-06-12 09:40:56,157 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.RevokeSystemGrantedFileAccessMessage, AsiPort.Messages.Upload, 4e3b68ef-bf6f-4418-8c34-a06e00c08490
2012-06-12 09:40:56,161 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.RevokeSystemGrantedFileAccessMessage, AsiPort.Messages.Upload, 4e3b68ef-bf6f-4418-8c34-a06e00c08490
2012-06-12 09:40:56,163 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.RevokeSystemGrantedFileAccessMessage, AsiPort.Messages.Upload, 81df48ba-13f0-4d89-a90b-a06e00c08492
2012-06-12 09:40:56,206 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.FileVerificationResultMessage, AsiPort.Messages.Upload, 46397548-6179-4385-9d49-a06e00c0849f
2012-06-12 09:40:56,207 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.FileVerificationResultMessage, AsiPort.Messages.Upload, 46397548-6179-4385-9d49-a06e00c0849f
2012-06-12 09:40:56,208 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.FileVerificationResultMessage, AsiPort.Messages.Upload, 0992299f-92ad-43c1-b3d8-a06e00c0849f
2012-06-12 09:40:56,247 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.ShareFileResultMessage, AsiPort.Messages.Upload, dc8fa0d0-4e90-481c-a21d-a06e00c084ab
2012-06-12 09:40:56,248 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.ShareFileResultMessage, AsiPort.Messages.Upload, dc8fa0d0-4e90-481c-a21d-a06e00c084ab
2012-06-12 09:40:56,249 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.ShareFileResultMessage, AsiPort.Messages.Upload, af97d711-31d3-44d9-95c1-a06e00c084ac
2012-06-12 09:40:56,313 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.UploadedFileHashSumMessage, AsiPort.Messages.Upload, 0c009b38-8dde-4d5b-9e48-a06e00c084bf
2012-06-12 09:40:56,314 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.UploadedFileHashSumMessage, AsiPort.Messages.Upload, 0c009b38-8dde-4d5b-9e48-a06e00c084bf
2012-06-12 09:40:56,315 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.UploadedFileHashSumMessage, AsiPort.Messages.Upload, 9a75cd6b-7fc5-41ba-aa2b-a06e00c084c0
2012-06-12 09:40:56,344 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.UploadedFileMessage, AsiPort.Messages.Upload, 213393e0-c8b4-4e0b-9885-a06e00c084c8
2012-06-12 09:40:56,355 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.UploadedFileMessage, AsiPort.Messages.Upload, 213393e0-c8b4-4e0b-9885-a06e00c084c8
2012-06-12 09:40:56,356 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.UploadedFileMessage, AsiPort.Messages.Upload, 6c46bc5e-bba3-445d-b331-a06e00c084cc
2012-06-12 09:40:56,389 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionEventListener - SubscribeTo: AsiPort.Messages.Upload.FileHeaderCreatedMessage, AsiPort.Messages.Upload, 5295325c-40e9-432b-9b58-a06e00c084d6
2012-06-12 09:40:56,394 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscriptionCache - SubscribeTo: AsiPort.Messages.Upload.FileHeaderCreatedMessage, AsiPort.Messages.Upload, 5295325c-40e9-432b-9b58-a06e00c084d6
2012-06-12 09:40:56,395 DEBUG [12 ] MassTransit.Subscriptions.Coordinator.BusSubscription - SubscribeTo: AsiPort.Messages.Upload.FileHeaderCreatedMessage, AsiPort.Messages.Upload, 154b53ae-b089-49c3-8ea0-a06e00c084d8
2012-06-12 09:40:56,454 INFO [12 ] AsiPort.Service.UploadListener.UploadListenerService - Starting Upload Listener Service
******************************* Masstransit configuration ********************************************
private void SetupConfigurator(string channel, bool transactional, IWindsorContainer container, MassTransit.BusConfigurators.ServiceBusConfigurator sbc)
{
try
{
sbc.ReceiveFrom("rabbitmq://localhost/" + channel);
sbc.UseRabbitMqRouting();
sbc.UseControlBus();
sbc.SetCreateTransactionalQueues(transactional);
sbc.Subscribe(subs => this.Setup(container, subs));
sbc.SetPurgeOnStartup(false);
}
catch (Exception ex)
{
throw new ApplicationException("Unable to setup MassTransit!", ex);
}
}
private void Setup(IWindsorContainer container, MassTransit.SubscriptionConfigurators.SubscriptionBusServiceConfigurator subs)
{
subs.LoadFrom(container);
}