Greetings! I am researching and learning about how MassTransit and
RabbitMQ work and in so doing I am trying to prove its ability to dole
out work on multiple threads.
My question is, why is
'ConsumersIn' only ever the value 1? From this experiment, it does not
look like work is being executed in parallel threads. There are 3
possibilities; either 1) I don't understand the impact of using the
async console, Interlocked, and/or await in this scenario, 2) MT is
configured wrong, or 3) MT is not doing what its supposed to.
What I am seeing is
* MassTransit is creating instances of consumers each time a message needs to be consumed (good)
* Multiple threads are being created, but only one thread is running at a time
* The one thread processes numerous messages one by one, and then another thread starts working.
What the producer is doing:
* Producer generates random numbers and submits them to the queue for processing. The producer doesn't care about the result.
What the consumer is doing:
* The only operation is a prime number calculation. (The
algorithm isn't amazing, but its good for simulating work without using
Thread.Sleep)
How I am counting threads:
* As control enters
the Consume method, I increment a static integer variable. As control
leaves the Consume method, I decrement the variable. I understand there
are plenty of other places the scheduler could choose to switch threads,
but statistically, it should happen sometimes in the time consuming
prime number calculation... but it doesn't. The output is also always
ordered and never interleaved.
Producer code:
class Program
{
static void Main(string[] args)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.EnablePerformanceCounters();
var host1 = cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
});
for(int i = 0; i < 1000; i++)
{
Calculation c = Calculation.GenerateRandomCalculation();
Console.WriteLine(c.ToString());
Task.Run(async () => await busControl.Publish<ICalculation>(c)).GetAwaiter().GetResult();
}
}
}
Consumer code:
class Program
{
static void Main(string[] args)
{
try
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.EnablePerformanceCounters();
var host1 = cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host1, "ha.calculate_queue", re =>
{
re.Consumer<MathConsumer>(cons =>
{
cons.UseConcurrencyLimit(16);
});
re.UseConcurrencyLimit(16);
re.PrefetchCount = 16;
});
});
Console.WriteLine("PRESS A KEY TO EXIT");
busControl.Start();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
Console.ReadKey();
}
static void ConfigureLogger()
{
const string logConfig = @"<?xml version=""1.0"" encoding=""utf-8"" ?>
<log4net>
<root>
<level value=""INFO"" />
<appender-ref ref=""console"" />
</root>
<appender name=""console"" type=""log4net.Appender.ColoredConsoleAppender"">
<layout type=""log4net.Layout.PatternLayout"">
<conversionPattern value=""%m%n"" />
</layout>
</appender>
</log4net>";
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(logConfig)))
{
XmlConfigurator.Configure(stream);
}
}
}
Other supporting code:
public interface ICalculation
{
Guid MessageKey { get; }
DateTime CreationTime { get; }
int Operand1 { get; }
int Operand2 { get; }
Operation Operator { get; }
}
public enum Operation
{
Add,
Subtract,
Multiply,
Divide,
HighestPrime
}
public class MathConsumer : IConsumer<ICalculation>
{
public static int ConsumersIn = 0;
public async Task Consume(ConsumeContext<ICalculation> context)
{
Interlocked.Add(ref ConsumersIn, 1);
await Console.Out.WriteLineAsync($"Perform
Work: {Thread.CurrentThread.ManagedThreadId} [{ConsumersIn}]
{context.Message.Operand1} {context.Message.Operator}
{context.Message.Operand2}");
int result = Calculate(context.Message.Operand1, context.Message.Operand2, context.Message.Operator);
await Console.Out.WriteLineAsync($"COMPLETE:
{Thread.CurrentThread.ManagedThreadId} [{ConsumersIn}] Received
calculation: {context.Message.Operand1} {context.Message.Operator}
{context.Message.Operand2} = {result}");
Interlocked.Add(ref ConsumersIn, -1);
}
public int Calculate(int op1, int op2, Operation kind)
{
switch (kind)
{
case Operation.Add:
return op1 + op2;
case Operation.Subtract:
return op1 - op2;
case Operation.Multiply:
return op1 * op2;
case Operation.Divide:
return op1 / op2;
case Operation.HighestPrime:
return GetHighestPrime(op1);
}
throw new Exception("Ran out of chocolate sprinkles");
}
public int GetHighestPrime(int limit)
{
int pp = 2;
List<int> ps = new List<int>() { pp };
pp++;
ps.Add(pp);
while (pp < limit)
{
pp += 2;
bool test = true;
foreach (int p in ps)
{
if (pp % p == 0)
{
test = false;
break;
}
}
if (test)
{
ps.Add(pp);
}
}
return ps.Last();
}
}
public class Calculation : ICalculation
{
private static Random Randy = new Random();
public static Calculation GenerateRandomCalculation()
{
return new Calculation()
{
CreationTime = DateTime.Now,
MessageKey = Guid.NewGuid(),
Operand1 = Randy.Next(500000, 1200000),
Operand2 = Randy.Next(),
Operator = Operation.HighestPrime //(Operation)(Randy.Next(0, 5))
};
}
public DateTime CreationTime { get; set; }
public Guid MessageKey { get; set; }
public int Operand1 { get; set; }
public int Operand2 { get; set; }
public Operation Operator { get; set; }
public override string ToString()
{
return $"{this.Operand1} {this.Operator} {this.Operand2}";
}
}
Program Output:
Perform Work: 17 [1] 82306 HighestPrime 2101306765
COMPLETE: 17 [1] Received calculation: 82306 HighestPrime 2101306765 = 82307
Perform Work: 17 [1] 73410 HighestPrime 506634392
COMPLETE: 17 [1] Received calculation: 73410 HighestPrime 506634392 = 73387
Perform Work: 11 [1] 87429 HighestPrime 1831895481
COMPLETE: 11 [1] Received calculation: 87429 HighestPrime 1831895481 = 87427
Perform Work: 11 [1] 113519 HighestPrime 833181279
COMPLETE: 11 [1] Received calculation: 113519 HighestPrime 833181279 = 113513
Perform Work: 11 [1] 55299 HighestPrime 367417065
COMPLETE: 11 [1] Received calculation: 55299 HighestPrime 367417065 = 55291
Perform Work: 11 [1] 93359 HighestPrime 1265745022
COMPLETE: 11 [1] Received calculation: 93359 HighestPrime 1265745022 = 93337
Perform Work: 11 [1] 70169 HighestPrime 555424785
COMPLETE: 11 [1] Received calculation: 70169 HighestPrime 555424785 = 70163
Perform Work: 11 [1] 83762 HighestPrime 2086002951
COMPLETE: 11 [1] Received calculation: 83762 HighestPrime 2086002951 = 83761
Perform Work: 11 [1] 80903 HighestPrime 2138962522
COMPLETE: 11 [1] Received calculation: 80903 HighestPrime 2138962522 = 80897
Perform Work: 11 [1] 78802 HighestPrime 483943087
COMPLETE: 11 [1] Received calculation: 78802 HighestPrime 483943087 = 78803
Perform Work: 11 [1] 98340 HighestPrime 2134861948
COMPLETE: 11 [1] Received calculation: 98340 HighestPrime 2134861948 = 98327
Perform Work: 11 [1] 119054 HighestPrime 474456962
COMPLETE: 11 [1] Received calculation: 119054 HighestPrime 474456962 = 119047
Perform Work: 11 [1] 66635 HighestPrime 1134926358
COMPLETE: 11 [1] Received calculation: 66635 HighestPrime 1134926358 = 66629
Perform Work: 11 [1] 67769 HighestPrime 1726738900
COMPLETE: 11 [1] Received calculation: 67769 HighestPrime 1726738900 = 67763
Perform Work: 11 [1] 74804 HighestPrime 172259014
COMPLETE: 11 [1] Received calculation: 74804 HighestPrime 172259014 = 74797
Perform Work: 11 [1] 59227 HighestPrime 1172309114
COMPLETE: 11 [1] Received calculation: 59227 HighestPrime 1172309114 = 59221
Perform Work: 11 [1] 100251 HighestPrime 1794829899
COMPLETE: 11 [1] Received calculation: 100251 HighestPrime 1794829899 = 100237
Perform Work: 18 [1] 77006 HighestPrime 953064913
COMPLETE: 18 [1] Received calculation: 77006 HighestPrime 953064913 = 77003
Perform Work: 18 [1] 76463 HighestPrime 341517405
COMPLETE: 18 [1] Received calculation: 76463 HighestPrime 341517405 = 76463
Perform Work: 18 [1] 102985 HighestPrime 1982110506
COMPLETE: 18 [1] Received calculation: 102985 HighestPrime 1982110506 = 102983
Perform Work: 18 [1] 119985 HighestPrime 2063288548
COMPLETE: 18 [1] Received calculation: 119985 HighestPrime 2063288548 = 119983
Perform Work: 18 [1] 59258 HighestPrime 586859050
COMPLETE: 18 [1] Received calculation: 59258 HighestPrime 586859050 = 59243
Perform Work: 18 [1] 88464 HighestPrime 897009467
COMPLETE: 18 [1] Received calculation: 88464 HighestPrime 897009467 = 88463
Perform Work: 18 [1] 68910 HighestPrime 441930383
COMPLETE: 18 [1] Received calculation: 68910 HighestPrime 441930383 = 68909
Perform Work: 18 [1] 79270 HighestPrime 1771484691
COMPLETE: 18 [1] Received calculation: 79270 HighestPrime 1771484691 = 79259
Perform Work: 18 [1] 65086 HighestPrime 70422605
COMPLETE: 18 [1] Received calculation: 65086 HighestPrime 70422605 = 65071
Perform Work: 18 [1] 92651 HighestPrime 516211786
Hardware info:
Windows 10 Pro on a MacBook Air (2 core, 4 logical processors, Intel i5-5257U)