RabbitMQ .Net Client not adding Jaeger trace context to Headers

124 views
Skip to first unread message

Edmore Tshuma

unread,
Sep 22, 2023, 9:39:19 AM9/22/23
to rabbitmq-users
I have a .Net Class Library project which implements RabbitMQ producers and consumers. I have added OpenTelemetry/Jaeger trace context to the message headers but at runtime when I inspect the message's Headers property it is null.

The tracing context is being generated correctly but it seems the trace context, tags as well as baggage are all not being added to the headers.

This is the publisher code (truncated for brevity) :

```
namespace Rabbit.Comms.Client
{
    public class RpcPublisher : IDisposable
    {
   
    protected virtual void Dispose(bool disposing)
    {
       ...
       ...
    }

     public void Dispose()
    {
        ...
        GC.SuppressFinalize(this);
    }
    #endregion Dispose

    internal Subscriber<BaseResponseMessageContext> Subscriber { get; }
    internal Publisher Publisher { get; }
    internal IModel Channel { get; }
    internal ILogger Logger { get; }
    private bool DisposeChannel { get; } = false;

    private readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
    private readonly ActivitySource Activity = new ActivitySource(nameof(Publisher));
    private static string trackId = null;

    private readonly ConcurrentDictionary<string, TaskCompletionSource<BaseResponseMessageContext>> callbackMapper =
            new ConcurrentDictionary<string, TaskCompletionSource<BaseResponseMessageContext>>();

    public RpcPublisher(IConnection connection, string subscriberExchangeName = "amq.direct", ILogger logger = null) : this(connection.CreateModel(), subscriberExchangeName, logger)
    {
        DisposeChannel = true;
    }

    public RpcPublisher(IModel channel, string subscriberExchangeName = "amq.direct", ILogger logger = null)
    {
        Logger = logger;
        Channel = channel;
        Channel.BasicReturn += Channel_BasicReturn;
        Publisher = new Publisher(channel);
        Subscriber = new Subscriber<BaseResponseMessageContext>(channel, RabbitMQExtension.GetDefaultSubscriberRoutingKey, ConsumerFunctionAsync, subscriberExchangeName ?? RabbitMQExtension.GetDefaultSubscriberExchangeName, allowCancellation: false, logger: logger);
    }
   ...
   ....

    private CancellationTokenSource CancellationTokenSource { get; set; } = null;

    public async Task<BaseResponseMessageContext> SendAsync(string routingKey, IMessageContext message, int timeoutSec = 0, string exchangeName = "amq.topic", CancellationToken ct = default)
    {

        using (var tracerProvider = TracerProviderFactory.CreateTracerProvider())
        {

            ActivitySource.AddActivityListener(TraceActivityListener.CreateActivityListener());

            using (var activity = Activity.StartActivity("RabbitMq Publish", ActivityKind.Producer))
            {
                //override TrackingId with jaeger TraceId
                trackId = TracerProviderFactory.ConvertToGuidFormat(activity?.TraceId.ToHexString());
                PropertyInfo globalTrackIdProperty = message.GetType().GetProperty("TrackingId");
                if (globalTrackIdProperty != null && globalTrackIdProperty.CanWrite)
                    globalTrackIdProperty.SetValue(message, trackId);
               
                //add trace context to header
                IBasicProperties props = Channel.CreateBasicProperties();
                AddActivityToHeader(activity, props);
                 
                //test with ordinary headers
                //props.Headers = new Dictionary<string, object>();
                //props.Headers["Name"] = "Bob";
                //props.Headers["Age"] = 21;

                if (message == null)
            throw new ArgumentNullException(nameof(message));

        string correlationId = RabbitMQExtension.GetCorrelationId();
        if (callbackMapper.ContainsKey(correlationId))
            throw new DuplicateWaitObjectException(nameof(correlationId), $"A message with this correlationId: {correlationId} has already been used. Please generate new ones.");
        try
        {
            TaskCompletionSource<BaseResponseMessageContext> tcs = new TaskCompletionSource<BaseResponseMessageContext>(TaskCreationOptions.RunContinuationsAsynchronously);
            if (!callbackMapper.TryAdd(correlationId, tcs))
                throw new Exception($"CorrelationId: {correlationId} was not added");

            //setting max timeout
            CancellationTokenSource = new CancellationTokenSource(TimeoutMs(timeoutSec));
           ....
           ....

            Publisher.ReplyTo replyTo = new Publisher.ReplyTo()
            {
                ExchangeName = Subscriber.ExchangeName,
                RoutingKey = Subscriber.RoutingKey
            };

            Logger?.LogDebug("A message with correlationId: {correlationId} will be send. Please reply to {ExchangeName} -> {RoutingKey}", correlationId, replyTo.ExchangeName, replyTo.RoutingKey);

            await Publisher.SendAsync(routingKey, message, exchangeName, correlationId: correlationId, replyTo: replyTo, ct: ct, mandatory: true);

            var test = await tcs.Task;
            timeoutToken.Dispose();
            userToken.Dispose();
            return test;

        }
        catch (Exception ex)
        {
            Logger?.LogError(ex, "RpcPublisher.SendAsync catch exception with correlationId:{correlationId}", correlationId);
            if (callbackMapper.TryRemove(correlationId, out var tmp))
                tmp.SetException(ex);
            throw ex;
        }
         }
       }
    }

    private void AddActivityToHeader(Activity activity, IBasicProperties props)
    {
        Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, InjectContextIntoHeader);
        activity?.SetTag("messaging.system", "rabbitmq");
        activity?.SetTag("messaging.destination_kind", "queue");
        activity?.SetTag("messaging.rabbitmq.queue", "sample");
    }

    private void InjectContextIntoHeader(IBasicProperties props, string key, string value)
    {
        try
        {
            if (props.Headers == null)
            {
                props.Headers = new Dictionary<string, object>();
            }
            props.Headers[key] = value;
        }
        catch (Exception ex)
        {
            Logger.LogInformation(ex, "Failed to inject trace context.");
        }
    }
}
}
```
The trace context is being picked up successfully:
trace


but the even after stepping through , either of the context or tags, is available in the headers:

headers

As a test I added the SpanId and TraceId as a pair of headers as below:

IBasicProperties props = Channel.CreateBasicProperties(); // AddActivityToHeader(activity, props); props.Headers = new Dictionary<string, object>(); props.Headers.Add("trace-id", activity?.TraceId.ToHexString()); props.Headers.Add("span-id", activity?.SpanId.ToHexString()); var check = props;

The properties are available in the message BasicProperties header but they are not available at the consumer end:

addheader

At the consumer end I am retrieving the headers via BasicDeliverEventArgs ea but strangely enough the headers are null :

                  namespace Rabbit.Comms.Client { public class Subscriber<T> : IDisposable where T : IMessageContext { #region Dispose private bool DisposeChannel { get; } = false; private bool disposedValue; protected virtual void Dispose(bool disposing) { ... } public void Dispose() { ... } public List<string> RoutingKeys { get; } public string RoutingKey { get { return RoutingKeys.OrderByDescending(r => r.Length).FirstOrDefault(); } } public string ExchangeName { get; } internal IModel Channel { get; } internal ILogger Logger { get; } private static readonly ActivitySource Activity = new ActivitySource("Subscriber"); private static readonly TextMapPropagator Propagator = new TraceContextPropagator(); public Subscriber(IConnection connection, string routingKey, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null) : this(connection, new List<string>() { routingKey }, consumerFunction, subscriberExchangeName, prefetchCount, allowCancellation, logger) { } public Subscriber(IModel channel, string routingKey, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null) : this(channel, new List<string>() { routingKey }, consumerFunction, subscriberExchangeName, prefetchCount, allowCancellation, logger) { } public Subscriber(IConnection connection, List<string> routingKeys, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null) : this(connection.CreateModel(), routingKeys, consumerFunction, subscriberExchangeName, prefetchCount, allowCancellation, logger) { DisposeChannel = true; } public Subscriber(IModel channel, List<string> routingKeys, Func<T, BasicDeliverEventArgs, CancellationToken, Task> consumerFunction, string subscriberExchangeName = "amq.topic", ushort? prefetchCount = null, bool allowCancellation = true, ILogger logger = null) { ExchangeName = subscriberExchangeName; RoutingKeys = routingKeys.Select(rk => RabbitMQExtension.CleanRoutingKey(rk)).ToList(); string queueName = RoutingKey; Channel = channel; if (prefetchCount != null) Channel.BasicQos(0, prefetchCount.Value, false); if (allowCancellation) CancelQueue(queueName); Channel.CreateQueue(queueName); foreach(string route in RoutingKeys) Channel.QueueBind(queueName, ExchangeName, route); EventingBasicConsumer consumer = new EventingBasicConsumer(Channel); if (Channel.DefaultConsumer == null) Channel.DefaultConsumer = consumer; consumer.Received += async (object sender, BasicDeliverEventArgs ea) => { try { var parentContext = Propagator.Extract(default, ea.BasicProperties, ExtractTraceContextFromBasicProperties); Baggage.Current = parentContext.Baggage; //Start a new Activity using (var activity = Activity.StartActivity("Process Message", ActivityKind.Consumer, parentContext.ActivityContext)) { CancellationTokenSource cts = new CancellationTokenSource(); CancellationTokenSource.Add(ea.BasicProperties.CorrelationId, cts); T messageData = RabbitMQExtension.DeserializeObject<T>(ea.Body.Span.ToArray()); Channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); await consumerFunction(messageData, ea, cts.Token); } } catch (Exception ex) { Logger?.LogError(ex, "Subscriber.Received ended with exception correlationId:{correlationId}", ea.BasicProperties.CorrelationId); throw ex; } }; Channel.BasicConsume(queueName, false, consumer); } ... ... //Extract the Activity from the message header public IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key) { try { if (props.Headers.TryGetValue(key, out var value)) { var bytes = value as byte[]; return new[] { Encoding.UTF8.GetString(bytes) }; } } catch (Exception ex) { Logger.LogInformation($"Failed to extract trace context: {ex}"); } return Enumerable.Empty<string>(); } //Add Tags to the Activity public static void AddActivityTags(Activity activity) { activity?.SetTag("messaging.system", "rabbitmq"); activity?.SetTag("messaging.destination_kind", "queue"); activity?.SetTag("messaging.rabbitmq.queue", "sample"); } }
}

I am implementing this based on the example given here

What am I missing?

Luke Bakken

unread,
Sep 22, 2023, 1:48:14 PM9/22/23
to rabbitmq-users
Hello,

Please continue the discussion here:


Thanks,
Luke

Reply all
Reply to author
Forward
0 new messages