FileSystemMessageDataRepository and Retry.Immediate seem to collide

58 views
Skip to first unread message

Tal McMahon

unread,
Mar 22, 2017, 4:40:29 PM3/22/17
to masstransit-discuss
Hello all,
I am using  "MassTransit.RabbitMQ" version="3.5.6" targetFramework="net452"  
I have a message that I want to use a FileSystemMessageDataRepository and store the document on my filesystem.  
I followed the LosTechies Post everything works well. (Notice the retry is commented out.)

  _bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
            {
                var host = sbc.Host(new Uri(busInfo.Address), h =>
                {
                    h.Username(busInfo.UserName);
                    h.Password(busInfo.Password);
                });
                if (!string.IsNullOrEmpty(endpointName))
                {
                    sbc.ReceiveEndpoint(host, endpointName,
                        ep =>
                        {                       
                         var repoPath = manager.GetConfig("DocumentRepository");
                         var directoryInfo = new DirectoryInfo(repoPath);
                         var repo =  new FileSystemMessageDataRepository(directoryInfo);
                         ep.UseMessageData<TSubscribeMessage>(repo);
                         // ep.UseRetry(r => r.Immediate(3));
                         ep.Handler<IDocumentMessage>(BubbleUpMessage);
                        });
                }
            });



I can also use the retry logic.
When I run the following code and intentionally throw an Exception, it will call the consume method a total of 4 times and then put the message on the error queue.
_bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
            {
                var host = sbc.Host(new Uri(busInfo.Address), h =>
                {
                    h.Username(busInfo.UserName);
                    h.Password(busInfo.Password);
                });
                if (!string.IsNullOrEmpty(endpointName))
                {
                    sbc.ReceiveEndpoint(host, endpointName,
                        ep =>
                        {                       
                         //var repoPath = manager.GetConfig("DocumentRepository");
                         //var directoryInfo = new DirectoryInfo(repoPath);
                         //var repo =  new FileSystemMessageDataRepository(directoryInfo);
                         //ep.UseMessageData<TSubscribeMessage>(repo);
                         ep.UseRetry(r => r.Immediate(3));
                         ep.Handler<IDocumentMessage>(BubbleUpMessage);
                        });
                }
            });


Everything seems great until I try to run both together:
When I run this with the intentional exception, it will fail once and immediately move the message to the error queue.
_bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
            {
                var host = sbc.Host(new Uri(busInfo.Address), h =>
                {
                    h.Username(busInfo.UserName);
                    h.Password(busInfo.Password);
                });
                if (!string.IsNullOrEmpty(endpointName))
                {
                    sbc.ReceiveEndpoint(host, endpointName,
                        ep =>
                        {                       
                         var repoPath = manager.GetConfig("DocumentRepository");
                         var directoryInfo = new DirectoryInfo(repoPath);
                         var repo =  new FileSystemMessageDataRepository(directoryInfo);
                         ep.UseMessageData<TSubscribeMessage>(repo);
                         ep.UseRetry(r => r.Immediate(3));
                         ep.Handler<IDocumentMessage>(BubbleUpMessage);
                        });
                }
            });

I have tried to move the retry logic to the bus with sbc.UseRetry(r => r.Immediate(3));   -- No Love there either.

Thoughts?

I am Stymied

Tal McMahon

Chris Patterson

unread,
Mar 22, 2017, 11:07:19 PM3/22/17
to masstrans...@googlegroups.com
What is TSubscribeMessage? It should be IDocumentMessage I would think to get the message data to load...

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/0a610de9-f5c3-4c39-9290-52ede3b5c1cc%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Tal McMahon

unread,
Mar 22, 2017, 11:11:22 PM3/22/17
to masstrans...@googlegroups.com
That's just a leftover generic from my real code... Missed it when I copied over

To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsubscribe...@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/u28z96bvwYA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstransit-discuss@googlegroups.com.

Tal McMahon

unread,
Mar 23, 2017, 8:04:25 AM3/23/17
to masstransit-discuss
Corrected Code:

_bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
           {
               var host = sbc.Host(new Uri(busInfo.Address), h =>
               {
                   h.Username(busInfo.UserName);
                   h.Password(busInfo.Password);
               });
               if (!string.IsNullOrEmpty(endpointName))
               {
                   sbc.ReceiveEndpoint(host, endpointName,
                       ep =>
                       {                      
                        var repoPath = manager.GetConfig("DocumentRepository");
                        var directoryInfo = new DirectoryInfo(repoPath);
                        var repo =  new FileSystemMessageDataRepository(directoryInfo);
                        ep.UseMessageData<IDocumentMessage>(repo);

Tal McMahon

unread,
Mar 23, 2017, 9:14:32 AM3/23/17
to masstransit-discuss
It looks like I have found the cause.
I was sending a test message and not using any actual blob data in the test harness.
When the repository is added into the mix if there isn't any pointer in the MessageData<byte[]> it immediately faults on the retry. 
I am not sure if that is a defensive mechanism to insure that the message has a pointer or if it is a bug somewhere in the retry pipeline.
I just wanted to follow up on my findings.
 

Chris Patterson

unread,
Mar 23, 2017, 9:23:42 AM3/23/17
to masstrans...@googlegroups.com
Order matters. If you place the retry before the message data statement it will retry. It's building a pipeline so order matters. 

__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/1f54464f-4746-4d6e-a42a-53a50d1d3096%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages