[opentracing][reactive-messaging] Passing tracing information from jax-rs resource to emitted kafka message

332 views
Skip to first unread message

Christof

unread,
Jan 5, 2021, 11:53:16 AM1/5/21
to SmallRye
Hi there

we use quarkus-opentracing, quarkus-smallrye-reactive-messaging-kafka and opentracing-kafka-client to pass tracing headers through kafka messages.

however, sending the tracing headers from a jax-rs resource to a kafka message does not seem to be working as we expect. 

@ApplicationScoped
public class MsgProducer {

    @Inject
    @Channel("msg")
    Emitter<CustomObject> emitter;

    public void send(CustomObject obj) {
        // trace exists here
        emitter.send(obj);
    }
}

calling send() from rest-resource does not inject the tracing header in the emitted kafka message.

Snippet from Application.properties
mp.messaging.outgoing.msg.connector=smallrye-kafka
mp.messaging.outgoing.msg.topic=topic
mp.messaging.outgoing.msg.incerceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor

Is there anything I missed?

thanks in advance

Christof

clement escoffier

unread,
Jan 5, 2021, 12:46:36 PM1/5/21
to SmallRye, Ken Finnigan
Hello,

Do you use context propagation? 
@Ken Finnigan  do you have an idea?

We don't do anything special for the emitter. Maybe we should, but I was thinking that context propagation would do the job.

Clement


--
You received this message because you are subscribed to the Google Groups "SmallRye" group.
To unsubscribe from this group and stop receiving emails from it, send an email to smallrye+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/smallrye/3bcb1a48-a15d-41d4-af04-8679cb3d3baen%40googlegroups.com.

Ken Finnigan

unread,
Jan 5, 2021, 1:32:15 PM1/5/21
to clement escoffier, SmallRye
I actually did this for a book I'm writing with John Clingan about Quarkus and MicroProfile.

For now, you will need to do something like:

      RecordHeaders headers = new RecordHeaders();
      TracingKafkaUtils.inject(tracer.activeSpan().context(), headers, tracer);
      OutgoingKafkaRecordMetadata<Object> kafkaMetadata = OutgoingKafkaRecordMetadata.builder()
          .withHeaders(headers)
          .build();
      emitter.send(Message.of(payload, Metadata.of(kafkaMetadata)));

TracingKafkaUtils is from opentracing-kafka-client and is needed because there is no automatic correlation between the existing span and the emitter.
Also, the tracer instance is just from @Inject Tracer.

Ken

Christof

unread,
Jan 5, 2021, 4:28:18 PM1/5/21
to SmallRye
hey ken and clement

i tried both version (with using context-propagation) and also with kens version. context-propagation didn't change anything.

with ken's version the kafka message is like this:

Key (4 bytes): null
Value (191 bytes): {"ack":{"":{"cancelled":false,"completedExceptionally":false,"done":true,"numberOfDependents":0}},"metadata":{},"nack":{},"payload":<JSONPAYLOAD>
Timestamp: 1609880473781
Partition: 0
Offset: 6
Headers: 

Headers are still empty. I'm on Quarkus 1.10.5. 

i have another service which is consuming from this topic i mentioned above (but this time using @Incoming) there the injecting works fine with just opentracing-kafka-client and
mp.messaging.outgoing.shop-order-confirmation.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor

code is the following

    @Inject
    @Channel("topic-reply")
    Emitter<CustomObject2> anotherEmitter;


    @Incoming("topic")
    public CompletionStage<Void> consume(KafkaRecord<String, CustomObject> message) {
        return CompletableFuture.runAsync(() -> {
            try (Scope scope = tracer.buildSpan("consume").asChildOf(TracingKafkaUtils.extractSpanContext(message.getHeaders(), tracer)).startActive(true)) {
                anotherEmitter.send(CustomObject2)
            } catch (Exception e) {
                logger.info("extractSpanContext", e);
            }
        }).thenRun(message::ack);
    }

The message in topic-reply where a tracing header is injected (trace is a new one, but it is injected):

Key (4 bytes): null
Value (58 bytes): <JSONPAYLOAD>
Timestamp: 1609881056709
Partition: 0
Offset: 3
Headers: uber-trace-id=d9076d9ab77dc8d4:d9076d9ab77dc8d4:0:1

any other ideas?

Ken Finnigan

unread,
Jan 6, 2021, 4:07:17 PM1/6/21
to SmallRye
On Tuesday, January 5, 2021 at 4:28:18 PM UTC-5 Christof wrote:
hey ken and clement

i tried both version (with using context-propagation) and also with kens version. context-propagation didn't change anything.

with ken's version the kafka message is like this:

Key (4 bytes): null
Value (191 bytes): {"ack":{"":{"cancelled":false,"completedExceptionally":false,"done":true,"numberOfDependents":0}},"metadata":{},"nack":{},"payload":<JSONPAYLOAD>
Timestamp: 1609880473781
Partition: 0
Offset: 6
Headers: 


Forgot to mention that the solution I had still needed the TracingConsumerInterceptor added to the channel. Was that added?

Ken Finnigan

unread,
Jan 6, 2021, 4:09:34 PM1/6/21
to SmallRye
Sorry, needs the producer interceptor, not the consumer one!

You received this message because you are subscribed to a topic in the Google Groups "SmallRye" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/smallrye/F4MXSjtsXgc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to smallrye+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/smallrye/d3870526-67ee-467c-a31d-ef684aca2618n%40googlegroups.com.

Christof

unread,
Jan 7, 2021, 2:32:59 PM1/7/21
to SmallRye
Not 100% sure TracingProducerInterceptor was added. However, we got it working with:


@Inject
@Channel("topic")
Emitter<CustomObject> emitter;

public void createRequest(CustomObject customObject) {
    HeadersMapInjectAdapter headersMapInjectAdapter = new HeadersMapInjectAdapter();
    
    try (Scope scope = tracer.buildSpan("request").startActive(true)) {
        tracer.inject(scope.span().context(), Format.Builtin.TEXT_MAP, headersMapInjectAdapter);
        
        OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.<CustomObject>builder()
                .withKey(customObject)
                .withTopic("topic")
                .withHeaders(headersMapInjectAdapter.getRecordHeaders())
                .build();
        Message<CustomObject> message = Message.of(customObject, Metadata.of(metadata));

        emitter.send(message);
    }
}

I think that's more or less the same as you did Ken. For the records TracingProducerInterceptor is added to the channel. Will try again your method if i have time as it seems cleaner.

Thank you.

Ken Finnigan

unread,
Jan 7, 2021, 2:39:30 PM1/7/21
to SmallRye
Great to hear you got it working

Reply all
Reply to author
Forward
0 new messages