Proving consumers are running in parallel

1,338 views
Skip to first unread message

shadow...@gmail.com

unread,
Apr 13, 2016, 11:40:35 AM4/13/16
to masstransit-discuss
Greetings! I am researching and learning about how MassTransit and RabbitMQ work and in so doing I am trying to prove its ability to dole out work on multiple threads.
My question is, why is 'ConsumersIn' only ever the value 1? From this experiment, it does not look like work is being executed in parallel threads. There are 3 possibilities; either 1) I don't understand the impact of using the async console, Interlocked, and/or await in this scenario, 2) MT is configured wrong, or 3) MT is not doing what its supposed to.

What I am seeing is
* MassTransit is creating instances of consumers each time a message needs to be consumed (good)
* Multiple threads are being created, but only one thread is running at a time
* The one thread processes numerous messages one by one, and then another thread starts working.

What the producer is doing:
* Producer generates random numbers and submits them to the queue for processing. The producer doesn't care about the result.

What the consumer is doing:
* The only operation is a prime number calculation. (The algorithm isn't amazing, but its good for simulating work without using Thread.Sleep)

How I am counting threads:
* As control enters the Consume method, I increment a static integer variable. As control leaves the Consume method, I decrement the variable. I understand there are plenty of other places the scheduler could choose to switch threads, but statistically, it should happen sometimes in the time consuming prime number calculation... but it doesn't. The output is also always ordered and never interleaved.

Producer code:
    class Program
   
{
       
static void Main(string[] args)
       
{

           
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
           
{
                cfg
.EnablePerformanceCounters();
               
var host1 = cfg.Host(new Uri("rabbitmq://localhost"), h =>
               
{
                    h
.Username("guest");
                    h
.Password("guest");
               
});
           
});

           
for(int i = 0; i < 1000; i++)
           
{
               
Calculation c = Calculation.GenerateRandomCalculation();
               
Console.WriteLine(c.ToString());
               
Task.Run(async () => await busControl.Publish<ICalculation>(c)).GetAwaiter().GetResult();
           
}
       
}
   
}


Consumer code:

class Program
   
{
       
static void Main(string[] args)
       
{
           
try
           
{
               
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
               
{
                    cfg
.EnablePerformanceCounters();
                   
                   
var host1 = cfg.Host(new Uri("rabbitmq://localhost"), h =>
                   
{
                        h
.Username("guest");
                        h
.Password("guest");
                   
});

                    cfg
.ReceiveEndpoint(host1, "ha.calculate_queue", re =>
                   
{
                        re
.Consumer<MathConsumer>(cons =>
                       
{
                            cons
.UseConcurrencyLimit(16);
                       
});
                        re
.UseConcurrencyLimit(16);
                        re
.PrefetchCount = 16;
                   
});

               
});
               
               
Console.WriteLine("PRESS A KEY TO EXIT");
               
                busControl
.Start();
           
}
           
catch (Exception ex)
           
{
               
Console.WriteLine(ex.ToString());
           
}
           
Console.ReadKey();
       
}

       
static void ConfigureLogger()
       
{
           
const string logConfig = @"<?xml version=""1.0"" encoding=""utf-8"" ?>
<log4net>
  <root>
    <level value=""INFO"" />
    <appender-ref ref=""console"" />
  </root>
  <appender name=""console"" type=""log4net.Appender.ColoredConsoleAppender"">
    <layout type=""log4net.Layout.PatternLayout"">
      <conversionPattern value=""%m%n"" />
    </layout>
  </appender>
</log4net>"
;

           
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(logConfig)))
           
{
               
XmlConfigurator.Configure(stream);
           
}
       
}
   
}

Other supporting code:
    public interface ICalculation
   
{
       
Guid MessageKey { get; }
       
DateTime CreationTime { get; }
       
int Operand1 { get; }
       
int Operand2 { get; }
       
Operation Operator { get; }
   
}
    public enum Operation
   
{
       
Add,
       
Subtract,
       
Multiply,
       
Divide,
       
HighestPrime
   
}
   
public class MathConsumer : IConsumer<ICalculation>
   
{
       
public static int ConsumersIn = 0;

       
public async Task Consume(ConsumeContext<ICalculation> context)
       
{
           
Interlocked.Add(ref ConsumersIn, 1);
           
            await
Console.Out.WriteLineAsync($"Perform Work: {Thread.CurrentThread.ManagedThreadId} [{ConsumersIn}] {context.Message.Operand1} {context.Message.Operator} {context.Message.Operand2}");

           
int result = Calculate(context.Message.Operand1, context.Message.Operand2, context.Message.Operator);

            await
Console.Out.WriteLineAsync($"COMPLETE:     {Thread.CurrentThread.ManagedThreadId} [{ConsumersIn}] Received calculation: {context.Message.Operand1} {context.Message.Operator} {context.Message.Operand2} = {result}");

           
Interlocked.Add(ref ConsumersIn, -1);
       
}

       
public int Calculate(int op1, int op2, Operation kind)
       
{
           
switch (kind)
           
{
               
case Operation.Add:
                   
return op1 + op2;
               
case Operation.Subtract:
                   
return op1 - op2;
               
case Operation.Multiply:
                   
return op1 * op2;
               
case Operation.Divide:
                   
return op1 / op2;
               
case Operation.HighestPrime:
                   
return GetHighestPrime(op1);
           
}
           
throw new Exception("Ran out of chocolate sprinkles");
       
}

       
public int GetHighestPrime(int limit)
       
{
           
int pp = 2;
           
List<int> ps = new List<int>() { pp };
            pp
++;
            ps
.Add(pp);
           
while (pp < limit)
           
{
                pp
+= 2;
               
bool test = true;
               
foreach (int p in ps)
               
{
                   
if (pp % p == 0)
                   
{
                        test
= false;
                       
break;
                   
}
               
}
               
if (test)
               
{
                    ps
.Add(pp);
               
}
           
}
           
return ps.Last();
       
}
   
}
   
public class Calculation : ICalculation
   
{
       
private static Random Randy = new Random();

       
public static Calculation GenerateRandomCalculation()
       
{
           
return new Calculation()
           
{
               
CreationTime = DateTime.Now,
               
MessageKey = Guid.NewGuid(),
               
Operand1 = Randy.Next(500000, 1200000),
               
Operand2 = Randy.Next(),
               
Operator = Operation.HighestPrime //(Operation)(Randy.Next(0, 5))
           
};
       
}

       
public DateTime CreationTime { get; set; }

       
public Guid MessageKey { get; set; }

       
public int Operand1 { get; set; }

       
public int Operand2 { get; set; }

       
public Operation Operator { get; set; }

       
public override string ToString()
       
{
           
return $"{this.Operand1} {this.Operator} {this.Operand2}";
       
}
   
}

Program Output:

Perform Work: 17 [1] 82306 HighestPrime 2101306765
COMPLETE
:     17 [1] Received calculation: 82306 HighestPrime 2101306765 = 82307
Perform Work: 17 [1] 73410 HighestPrime 506634392
COMPLETE
:     17 [1] Received calculation: 73410 HighestPrime 506634392 = 73387
Perform Work: 11 [1] 87429 HighestPrime 1831895481
COMPLETE
:     11 [1] Received calculation: 87429 HighestPrime 1831895481 = 87427
Perform Work: 11 [1] 113519 HighestPrime 833181279
COMPLETE
:     11 [1] Received calculation: 113519 HighestPrime 833181279 = 113513
Perform Work: 11 [1] 55299 HighestPrime 367417065
COMPLETE
:     11 [1] Received calculation: 55299 HighestPrime 367417065 = 55291
Perform Work: 11 [1] 93359 HighestPrime 1265745022
COMPLETE
:     11 [1] Received calculation: 93359 HighestPrime 1265745022 = 93337
Perform Work: 11 [1] 70169 HighestPrime 555424785
COMPLETE
:     11 [1] Received calculation: 70169 HighestPrime 555424785 = 70163
Perform Work: 11 [1] 83762 HighestPrime 2086002951
COMPLETE
:     11 [1] Received calculation: 83762 HighestPrime 2086002951 = 83761
Perform Work: 11 [1] 80903 HighestPrime 2138962522
COMPLETE
:     11 [1] Received calculation: 80903 HighestPrime 2138962522 = 80897
Perform Work: 11 [1] 78802 HighestPrime 483943087
COMPLETE
:     11 [1] Received calculation: 78802 HighestPrime 483943087 = 78803
Perform Work: 11 [1] 98340 HighestPrime 2134861948
COMPLETE
:     11 [1] Received calculation: 98340 HighestPrime 2134861948 = 98327
Perform Work: 11 [1] 119054 HighestPrime 474456962
COMPLETE
:     11 [1] Received calculation: 119054 HighestPrime 474456962 = 119047
Perform Work: 11 [1] 66635 HighestPrime 1134926358
COMPLETE
:     11 [1] Received calculation: 66635 HighestPrime 1134926358 = 66629
Perform Work: 11 [1] 67769 HighestPrime 1726738900
COMPLETE
:     11 [1] Received calculation: 67769 HighestPrime 1726738900 = 67763
Perform Work: 11 [1] 74804 HighestPrime 172259014
COMPLETE
:     11 [1] Received calculation: 74804 HighestPrime 172259014 = 74797
Perform Work: 11 [1] 59227 HighestPrime 1172309114
COMPLETE
:     11 [1] Received calculation: 59227 HighestPrime 1172309114 = 59221
Perform Work: 11 [1] 100251 HighestPrime 1794829899
COMPLETE
:     11 [1] Received calculation: 100251 HighestPrime 1794829899 = 100237
Perform Work: 18 [1] 77006 HighestPrime 953064913
COMPLETE
:     18 [1] Received calculation: 77006 HighestPrime 953064913 = 77003
Perform Work: 18 [1] 76463 HighestPrime 341517405
COMPLETE
:     18 [1] Received calculation: 76463 HighestPrime 341517405 = 76463
Perform Work: 18 [1] 102985 HighestPrime 1982110506
COMPLETE
:     18 [1] Received calculation: 102985 HighestPrime 1982110506 = 102983
Perform Work: 18 [1] 119985 HighestPrime 2063288548
COMPLETE
:     18 [1] Received calculation: 119985 HighestPrime 2063288548 = 119983
Perform Work: 18 [1] 59258 HighestPrime 586859050
COMPLETE
:     18 [1] Received calculation: 59258 HighestPrime 586859050 = 59243
Perform Work: 18 [1] 88464 HighestPrime 897009467
COMPLETE
:     18 [1] Received calculation: 88464 HighestPrime 897009467 = 88463
Perform Work: 18 [1] 68910 HighestPrime 441930383
COMPLETE
:     18 [1] Received calculation: 68910 HighestPrime 441930383 = 68909
Perform Work: 18 [1] 79270 HighestPrime 1771484691
COMPLETE
:     18 [1] Received calculation: 79270 HighestPrime 1771484691 = 79259
Perform Work: 18 [1] 65086 HighestPrime 70422605
COMPLETE
:     18 [1] Received calculation: 65086 HighestPrime 70422605 = 65071
Perform Work: 18 [1] 92651 HighestPrime 516211786


Hardware info:
Windows 10 Pro on a MacBook Air (2 core, 4 logical processors, Intel i5-5257U)

shadow...@gmail.com

unread,
Apr 13, 2016, 5:01:32 PM4/13/16
to masstransit-discuss
So after rooting around in the source code and doing more experiments, I think I figured it out, but I'm not happy with the solution. 

In the MathConsumer class, I changed the Consume method to the following:


public async Task Consume(ConsumeContext<ICalculation> context)
{
     Interlocked.Add(ref ConsumersIn, 1);
     await Task.Run(async () =>
     {
         logger.Debug($"Perform Work: {Thread.CurrentThread.ManagedThreadId} [{ConsumersIn}] {context.Message.Operand1} {context.Message.Operator} {context.Message.Operand2}");
         int result = Calculate(context.Message.Operand1 / 2, context.Message.Operand2, context.Message.Operator);
               string msg = $"COMPLETE:     {Thread.CurrentThread.ManagedThreadId} [{ConsumersIn}] Received calculation: {context.Message.Operand1} {context.Message.Operator} {context.Message.Operand2} = {result}";
               logger.Debug(msg);
     });
     Interlocked.Add(ref ConsumersIn, -1);
}

Basically, I put the "work" part into an async lambda so it would off load. Lo and behold, the output is doing what I expected now (I also changed concurrency max to 5, for the sake of experiment)
2016-04-13 14:51:05,430 [4    ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 4 [1] 945711 HighestPrime 1484526
2016-04-13 14:51:05,430 [5    ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 5 [2] 609538 HighestPrime 569901223
2016-04-13 14:51:05,431 [10   ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 10 [5] 534725 HighestPrime 539668699
2016-04-13 14:51:05,431 [9    ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 9 [5] 884896 HighestPrime 2097116091
2016-04-13 14:51:09,137 [10   ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     10 [5] Received calculation: 534725 HighestPrime 539668699 = 267353
2016-04-13 14:51:09,529 [11   ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 11 [4] 753799 HighestPrime 2100588465
2016-04-13 14:51:10,150 [5    ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     5 [4] Received calculation: 609538 HighestPrime 569901223 = 304763
2016-04-13 14:51:10,686 [10   ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 10 [4] 689526 HighestPrime 62844741
2016-04-13 14:51:10,688 [5    ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 5 [5] 973556 HighestPrime 1493674430
2016-04-13 14:51:15,919 [9    ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     9 [5] Received calculation: 884896 HighestPrime 2097116091 = 442447
2016-04-13 14:51:16,013 [9    ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 9 [5] 741393 HighestPrime 2094779179
2016-04-13 14:51:17,229 [4    ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     4 [5] Received calculation: 945711 HighestPrime 1484526 = 472847
2016-04-13 14:51:17,314 [4    ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 4 [5] 1189861 HighestPrime 868390270
2016-04-13 14:51:17,864 [11   ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     11 [5] Received calculation: 753799 HighestPrime 2100588465 = 376897
2016-04-13 14:51:17,999 [11   ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 11 [5] 562762 HighestPrime 1855398372
2016-04-13 14:51:18,118 [10   ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     10 [5] Received calculation: 689526 HighestPrime 62844741 = 344759
2016-04-13 14:51:18,235 [10   ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 10 [5] 825871 HighestPrime 1651810574
2016-04-13 14:51:23,075 [11   ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     11 [5] Received calculation: 562762 HighestPrime 1855398372 = 281381
2016-04-13 14:51:23,077 [11   ] DEBUG RabbitTestConsumer.MathConsumer: Perform Work: 11 [5] 799122 HighestPrime 1027998869
2016-04-13 14:51:24,604 [5    ] DEBUG RabbitTestConsumer.MathConsumer: COMPLETE:     5 [5] Received calculation: 973556 HighestPrime 1493674430 = 486769

Is there a way to thread consumers that are written with synchronous code without doing the lambda hack?

Chris Patterson

unread,
Apr 13, 2016, 7:19:03 PM4/13/16
to masstrans...@googlegroups.com
This isn't right. Consumers should properly thread up to the consumer limit. You should not spin off a separate task. 

Maybe add Task.Yield at your consumer start? I should check closer but the consumers should distribute as expected. Hmm. 


__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/8bdd44bb-ba12-4d0b-aee8-4dd7325cb6fb%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

shadow...@gmail.com

unread,
Apr 14, 2016, 10:56:38 AM4/14/16
to masstransit-discuss
await Task.Yield() definitely works. I removed the async lambda, and it's properly threadding up to the maximum still.


...

Chris Patterson

unread,
Apr 14, 2016, 1:36:38 PM4/14/16
to masstrans...@googlegroups.com
Interesting. I will investigate further. There used to be a Task.Yield() in the RabbitMQ consumer, but it was causing message ordering issues. But it doesn't make sense to have the consumer call the yield either.

I need to come up with a way to verify the messages are not being consumed on the rabbitmq thread. The way it's running for you now, you're seeing that the consumer pipeline is never yielding control back to the rabbitmq thread.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.

shadow...@gmail.com

unread,
Apr 14, 2016, 2:00:37 PM4/14/16
to masstransit-discuss
Thanks Chris, if there's anything else I can help with, let me know.
Message has been deleted

Tomer Yoskovich

unread,
Jun 14, 2016, 4:06:52 AM6/14/16
to masstransit-discuss
Hi Chris,

Task.Yield() is working like you suggested, but as you said, the consumer shouldn't be calling it,

Did you happen to find the problem by now ?

Thanks

On Monday, June 13, 2016 at 2:54:24 PM UTC+3, Tomer Yoskovich wrote:
Hello,

I'm now experiencing the same problem as you have,
(but calling Task.Yield doesn't help)
my prefetch is set to 10 and UseConncurencyLimit(10) but there still is only one consumer running.

is there anything I can do around it?

Roberto Lazazzera

unread,
Aug 30, 2016, 6:27:44 AM8/30/16
to masstransit-discuss
Hi Chris,
I'm having the same concurrency problem with queues that slow down from 100 mgs/sec to 2 msg/sec after upgrading to the latest version.
What do you suggest as temporary solution?
- downgrade to 3.2.1
- add Task.Yield() in the consumers

By the way, is anyone working on it? I can't find an issue on github to follow

Best regards
Roberto

Emrah Gozcu

unread,
Apr 26, 2017, 9:33:16 AM4/26/17
to masstransit-discuss
Dear Chris,

I asked this question on stackoverflow and later found this thread. It seems same issue to me. Could you please send an update about this issue. Task.Yield() not working for my case. My consumer runs single threaded.

To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.

Chris Patterson

unread,
Apr 26, 2017, 9:58:57 PM4/26/17
to masstrans...@googlegroups.com
What version of MassTransit? Are you blocking your thread or doing something mean to the TPL like Thread.Sleep?

__
Chris Patterson

From: masstrans...@googlegroups.com <masstrans...@googlegroups.com> on behalf of Emrah Gozcu <emrah...@gmail.com>
Sent: Wednesday, April 26, 2017 6:33:16 AM
To: masstransit-discuss
Subject: Re: [masstransit-discuss] Re: Proving consumers are running in parallel
 
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

To post to this group, send email to masstrans...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages