IBusControl.Stop fails after rabbitmq windows service restart

229 views
Skip to first unread message

vitor...@farfetch.com

unread,
Nov 18, 2015, 1:24:12 PM11/18/15
to masstransit-discuss
Why does  bus.Stop(); throws an exception after having stopped and restarted the rabbitmq windows service?

namespace ConsoleApplication5
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("1. Make sure that RabbitMQ windows service is running and press any key");
            Console.ReadKey();

            var bus = Bus.Factory.CreateUsingRabbitMq(
                cfg =>
                 {
                     var host = cfg.Host(
                            new Uri("rabbitmq://localhost/"),
                            h =>
                             {
                                 h.Username("user");
                                 h.Password("*****");
                             });

                     cfg.ReceiveEndpoint(host, "teste", ep =>
                     {
                         ep.Consumer<PayloadConsumer>();
                     });
                 });

            bus.Start();

            Console.WriteLine("2. STOP RabbitMQ windows service and press any key");
            Console.ReadKey();

            Console.WriteLine("3. START RabbitMQ windows service and press any key");
            Console.ReadKey();

            var retry = true;
            while (retry)
            {
                try
                {
                    bus.Publish(new Payload()).Wait();
                    retry = false;
                }
                catch (Exception)
                {
                    Console.WriteLine("Retrying press any key");
                    Console.ReadKey();
                } 
            }

            Console.WriteLine("4. stopping press any key");
            Console.ReadKey();

            bus.Stop();
            
            Console.WriteLine("5. This is never printed to the output :(");
            Console.ReadKey();
        }
    }

    public class Payload
    {
        public Payload()
        {
            this.MyProperty = new string('V', 1014);
        }

        public string MyProperty { get; set; }
    }

    public class PayloadConsumer : IConsumer<Payload>
    {
        public Task Consume(ConsumeContext<Payload> context)
        {
            Console.Write(".");
            return Task.FromResult<object>(null);
        }
    }
}

Can someone help me understanding this behavior,
Vitor.

Wayne Brantley

unread,
Nov 19, 2015, 8:12:11 AM11/19/15
to masstransit-discuss
Well, I am guessing the Bus had a connection with RabbitMq and then you killed it.
I am not sure if MT is using the latest version of the rabbitmq client and/or taking advantage of its recent ability to 'reconnect'?

The example you presented seems like a scenario we need to have working though!

vitor...@farfetch.com

unread,
Nov 19, 2015, 9:32:51 AM11/19/15
to masstransit-discuss
I' am using RabbitMQ.Client version="3.5.6" and MassTransit version="3.0.15"

No additional configuration options were used to configure rabbitmq client other than the ones masstransit uses by default.
Sometimes when running the sample program the "bus.Publish(new Payload()).Wait();" blocks indefinitely, after having printed the retry message and a key press.

This "blocking forever" behavior is causing problems for me. Is there any way to cancel the undergoing publish in this scenario?
(I have already tried the overload with a cancellation token, but even when the token gets canceled the publish method wont return)

Please help me, 
Vitor.

Wayne Brantley

unread,
Nov 19, 2015, 9:55:31 AM11/19/15
to masstransit-discuss
The rabbit connection factory must be set for reconfigure.
See page 11 of this document:

Not sure if MT is using/setting these values?  Have a look at the source...

Chris Patterson

unread,
Nov 19, 2015, 10:13:29 AM11/19/15
to masstrans...@googlegroups.com
So the RabbitMQ client reconnection is not used by MassTransit. It reconnects by itself when faults are detected. The built in reconnection that was added in RMQ is random and just not my favorite way to do things. 

This was working fine before so it must have been a bug introduced in .15 with the new shutdown code. I'll use your code to reproduce and fix. 

__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/80aa513d-c836-4056-bdef-4aa8a768f51a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

vitor...@farfetch.com

unread,
Nov 19, 2015, 10:46:01 AM11/19/15
to masstransit-discuss
Hi Chris,

I have tried this code using masstransit version 3.0.14  RabbitMQ.Client version="3.5.6" and it always blocks during the publish.

namespace ConsoleApplication5
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("1. Make sure that RabbitMQ windows service is running and press any key");
            Console.ReadKey();

            var bus = Bus.Factory.CreateUsingRabbitMq(
                cfg =>
                 {
                     var host = cfg.Host(
                            new Uri("rabbitmq://localhost/"),
                            h =>
                             {
                                 h.Username("user");
                                 h.Password("*****);
                             });

                     cfg.ReceiveEndpoint(host, "teste", ep =>
                     {
                         ep.Handler<Payload>(x => Console.Out.WriteAsync("."));
                     });
                 });

            using (var h = bus.Start())
            {
                Console.WriteLine("2. STOP RabbitMQ windows service and press any key");
                Console.ReadKey();

                int n = 3;
                while (n-- > 0)
                {
                    try
                    {
                        var cts = new CancellationTokenSource();
                        cts.CancelAfter(3000);
                        bus.Publish(new Payload(), cts.Token).Wait();
                        Console.Write("#");
                    }
                    catch (AggregateException ex)
                    {
                        Console.WriteLine(ex.InnerException.Message);
                    }

                    Thread.Sleep(100);
                }

                Console.WriteLine("3. This is never printed to the output :(");
                Console.ReadKey();

                h.Stop();
            }
        }
    }

    public class Payload
    {
        public Payload()
        {
            this.MyProperty = new string('V', 1014);
        }

        public string MyProperty { get; set; }
    }
}

Thank you for your assistance,
Vitor.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Chris Patterson

unread,
Nov 19, 2015, 11:38:44 AM11/19/15
to masstrans...@googlegroups.com
I was speaking about the error during bus.Stop() being an issue that might have been introduced in .15.

As for blocking on Publish..

Yes, if you're calling bus.Publish().Wait() - you're going to block the thread and hang the process. Asynchronous programming is not meant to be called like that from an [STAThread] such as Program.cs.

You can use MassTransit.Util.TaskUtil.Await(() => bus.Publish()) - to safely await from a non-async thread context.



To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to whom they are addressed. If you have received this email in error please notify the system manager. This message contains confidential information and is intended only for the individual named. If you are not the named addressee you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. If you are not the intended recipient you are notified that disclosing, copying, distributing or taking any action in reliance on the contents of this information is strictly prohibited.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages