I have a .Net Class Library project which implements RabbitMQ producers and consumers. I have added OpenTelemetry/Jaeger trace context to the message headers but at runtime when I inspect the message's Headers property it is null.
The tracing context is being generated correctly but it seems the trace context, tags as well as baggage are all not being added to the headers.
This is the publisher code (truncated for brevity) :
```
namespace Rabbit.Comms.Client
{
public class RpcPublisher : IDisposable
{
protected virtual void Dispose(bool disposing)
{
...
...
}
public void Dispose()
{
...
GC.SuppressFinalize(this);
}
#endregion Dispose
internal Subscriber<BaseResponseMessageContext> Subscriber { get; }
internal Publisher Publisher { get; }
internal IModel Channel { get; }
internal ILogger Logger { get; }
private bool DisposeChannel { get; } = false;
private readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private readonly ActivitySource Activity = new ActivitySource(nameof(Publisher));
private static string trackId = null;
private readonly ConcurrentDictionary<string, TaskCompletionSource<BaseResponseMessageContext>> callbackMapper =
new ConcurrentDictionary<string, TaskCompletionSource<BaseResponseMessageContext>>();
public RpcPublisher(IConnection connection, string subscriberExchangeName = "amq.direct", ILogger logger = null) : this(connection.CreateModel(), subscriberExchangeName, logger)
{
DisposeChannel = true;
}
public RpcPublisher(IModel channel, string subscriberExchangeName = "amq.direct", ILogger logger = null)
{
Logger = logger;
Channel = channel;
Channel.BasicReturn += Channel_BasicReturn;
Publisher = new Publisher(channel);
Subscriber = new Subscriber<BaseResponseMessageContext>(channel, RabbitMQExtension.GetDefaultSubscriberRoutingKey, ConsumerFunctionAsync, subscriberExchangeName ?? RabbitMQExtension.GetDefaultSubscriberExchangeName, allowCancellation: false, logger: logger);
}
...
....
private CancellationTokenSource CancellationTokenSource { get; set; } = null;
public async Task<BaseResponseMessageContext> SendAsync(string routingKey, IMessageContext message, int timeoutSec = 0, string exchangeName = "amq.topic", CancellationToken ct = default)
{
using (var tracerProvider = TracerProviderFactory.CreateTracerProvider())
{
ActivitySource.AddActivityListener(TraceActivityListener.CreateActivityListener());
using (var activity = Activity.StartActivity("RabbitMq Publish", ActivityKind.Producer))
{
//override TrackingId with jaeger TraceId
trackId = TracerProviderFactory.ConvertToGuidFormat(activity?.TraceId.ToHexString());
PropertyInfo globalTrackIdProperty = message.GetType().GetProperty("TrackingId");
if (globalTrackIdProperty != null && globalTrackIdProperty.CanWrite)
globalTrackIdProperty.SetValue(message, trackId);
//add trace context to header
IBasicProperties props = Channel.CreateBasicProperties();
AddActivityToHeader(activity, props);
//test with ordinary headers
//props.Headers = new Dictionary<string, object>();
//props.Headers["Name"] = "Bob";
//props.Headers["Age"] = 21;
if (message == null)
throw new ArgumentNullException(nameof(message));
string correlationId = RabbitMQExtension.GetCorrelationId();
if (callbackMapper.ContainsKey(correlationId))
throw new DuplicateWaitObjectException(nameof(correlationId), $"A message with this correlationId: {correlationId} has already been used. Please generate new ones.");
try
{
TaskCompletionSource<BaseResponseMessageContext> tcs = new TaskCompletionSource<BaseResponseMessageContext>(TaskCreationOptions.RunContinuationsAsynchronously);
if (!callbackMapper.TryAdd(correlationId, tcs))
throw new Exception($"CorrelationId: {correlationId} was not added");
//setting max timeout
CancellationTokenSource = new CancellationTokenSource(TimeoutMs(timeoutSec));
....
....
Publisher.ReplyTo replyTo = new Publisher.ReplyTo()
{
ExchangeName = Subscriber.ExchangeName,
RoutingKey = Subscriber.RoutingKey
};
Logger?.LogDebug("A message with correlationId: {correlationId} will be send. Please reply to {ExchangeName} -> {RoutingKey}", correlationId, replyTo.ExchangeName, replyTo.RoutingKey);
await Publisher.SendAsync(routingKey, message, exchangeName, correlationId: correlationId, replyTo: replyTo, ct: ct, mandatory: true);
var test = await tcs.Task;
timeoutToken.Dispose();
userToken.Dispose();
return test;
}
catch (Exception ex)
{
Logger?.LogError(ex, "RpcPublisher.SendAsync catch exception with correlationId:{correlationId}", correlationId);
if (callbackMapper.TryRemove(correlationId, out var tmp))
tmp.SetException(ex);
throw ex;
}
}
}
}
private void AddActivityToHeader(Activity activity, IBasicProperties props)
{
Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, InjectContextIntoHeader);
activity?.SetTag("messaging.system", "rabbitmq");
activity?.SetTag("messaging.destination_kind", "queue");
activity?.SetTag("messaging.rabbitmq.queue", "sample");
}
private void InjectContextIntoHeader(IBasicProperties props, string key, string value)
{
try
{
if (props.Headers == null)
{
props.Headers = new Dictionary<string, object>();
}
props.Headers[key] = value;
}
catch (Exception ex)
{
Logger.LogInformation(ex, "Failed to inject trace context.");
}
}
}
}
```
The trace context is being picked up successfully:
but the even after stepping through , either of the context or tags, is available in the headers:
As a test I added the SpanId and TraceId as a pair of headers as below:
IBasicProperties props = Channel.CreateBasicProperties();
// AddActivityToHeader(activity, props);
props.Headers = new Dictionary<string, object>();
props.Headers.Add("trace-id", activity?.TraceId.ToHexString());
props.Headers.Add("span-id", activity?.SpanId.ToHexString());
var check = props;
The properties are available in the message BasicProperties header but they are not available at the consumer end:
At the consumer end I am retrieving the headers via BasicDeliverEventArgs ea but strangely enough the headers are null :
namespace Rabbit.Comms.Client
{
public class Subscriber<T> : IDisposable where T : IMessageContext
{
#region Dispose
private bool DisposeChannel { get; } = false;
private bool disposedValue;
protected virtual void Dispose(bool disposing)
{
...
}
public void Dispose()
{
...
}
public List<string> RoutingKeys { get; }
public string RoutingKey { get { return RoutingKeys.OrderByDescending(r => r.Length).FirstOrDefault(); } }
public string ExchangeName { get; }
internal IModel Channel { get; }
internal ILogger Logger { get; }
private static readonly ActivitySource Activity = new ActivitySource("Subscriber");
private static readonly TextMapPropagator Propagator = new TraceContextPropagator();
public Subscriber(IConnection connection, string routingKey, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null)
: this(connection, new List<string>() { routingKey }, consumerFunction, subscriberExchangeName, prefetchCount, allowCancellation, logger)
{
}
public Subscriber(IModel channel, string routingKey, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null)
: this(channel, new List<string>() { routingKey }, consumerFunction, subscriberExchangeName, prefetchCount, allowCancellation, logger)
{
}
public Subscriber(IConnection connection, List<string> routingKeys, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null)
: this(connection.CreateModel(), routingKeys, consumerFunction, subscriberExchangeName, prefetchCount, allowCancellation, logger)
{
DisposeChannel = true;
}
public Subscriber(IModel channel, List<string> routingKeys, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null)
{
ExchangeName = subscriberExchangeName;
RoutingKeys = routingKeys.Select(rk => RabbitMQExtension.CleanRoutingKey(rk)).ToList();
string queueName = RoutingKey;
Channel = channel;
if (prefetchCount != null)
Channel.BasicQos(0, prefetchCount.Value, false);
if (allowCancellation)
CancelQueue(queueName);
Channel.CreateQueue(queueName);
foreach(string route in RoutingKeys)
Channel.QueueBind(queueName, ExchangeName, route);
EventingBasicConsumer consumer = new EventingBasicConsumer(Channel);
if (Channel.DefaultConsumer == null)
Channel.DefaultConsumer = consumer;
consumer.Received += async (object sender, BasicDeliverEventArgs ea) =>
{
try
{
var parentContext = Propagator.Extract(default, ea.BasicProperties, ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage;
//Start a new Activity
using (var activity = Activity.StartActivity("Process Message", ActivityKind.Consumer, parentContext.ActivityContext))
{
CancellationTokenSource cts = new CancellationTokenSource();
CancellationTokenSource.Add(ea.BasicProperties.CorrelationId, cts);
T messageData = RabbitMQExtension.DeserializeObject<T>(ea.Body.Span.ToArray());
Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
await consumerFunction(messageData, ea, cts.Token);
}
}
catch (Exception ex)
{
Logger?.LogError(ex, "Subscriber.Received ended with exception correlationId:{correlationId}", ea.BasicProperties.CorrelationId);
throw ex;
}
};
Channel.BasicConsume(queueName, false, consumer);
}
...
...
//Extract the Activity from the message header
public IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key)
{
try
{
if (props.Headers.TryGetValue(key, out var value))
{
var bytes = value as byte[];
return new[] { Encoding.UTF8.GetString(bytes) };
}
}
catch (Exception ex)
{
Logger.LogInformation($"Failed to extract trace context: {ex}");
}
return Enumerable.Empty<string>();
}
//Add Tags to the Activity
public static void AddActivityTags(Activity activity)
{
activity?.SetTag("messaging.system", "rabbitmq");
activity?.SetTag("messaging.destination_kind", "queue");
activity?.SetTag("messaging.rabbitmq.queue", "sample");
}
}
}
I am implementing this based on the example given here
What am I missing?