Using KafkaStreams with Confluent Interceptor

1,344 views
Skip to first unread message

John Hatcher

unread,
Jul 12, 2016, 2:17:25 PM7/12/16
to Confluent Platform
Is this possible? I see that you can configure your consumers and producers to use Confluent Monitoring Interceptor but there doesn't seem to be a way to do this when consuming via KafkaStreams as opposed to an explicit Kafka Consumer. 

Guozhang Wang

unread,
Jul 13, 2016, 4:20:15 PM7/13/16
to Confluent Platform
Hello John,

You can just pass the corresponding config properties to the StreamsConfig, for example:

// these are streams specific configs

cfg.put(StreamsConfig.APPLICATION_ID_CONFIG, “my-streams-app”);

cfg.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092”);

cfg.put(ConsumerConfig.AUTO_OFFSET_RESET_CONIFG, “earliest”);


// these are underlying clients configs

cfg.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_SSL”);

cfg.put(KafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, “registry:8081”);

cfg.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG“MyInterceptor”);



Bare in mind that if you want to have an interceptor for both producer and consumer, and the interceptors are configured with some configs with the same key, then there will be a conflict. We are working on this right now: https://issues.apache.org/jira/browse/KAFKA-3929


Guozhang

Robert

unread,
Aug 5, 2016, 10:58:21 AM8/5/16
to Confluent Platform
Hi Guozhang,

I've tried the suggestion you mentioned here:

cfg.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, “MyConsumerInterceptor”);


But ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG has the value: "interceptor.classes", this wil crash the stream.
It tries to initiate the producer with this consumer class, causing a 'java.lang.ClassCastException'.

If I change the config to "consumer.interceptor.classes, with my (consumer) interceptor class, it ignores it all together and won't call the interceptor at any point. I verified this by changing the interceptor to a producer interceptor (should crash the stream) but it runs fine. 

Any ideas on how to add interceptors to a stream? 

Thanks!

Best,

Robert.

Guozhang Wang

unread,
Aug 5, 2016, 5:42:22 PM8/5/16
to Confluent Platform
Hi Robert,

We realized this issue as well and have tried to fix it in trunk with prefix:


Does this sound good to you?

Guozhang

Guozhang Wang

unread,
Aug 5, 2016, 5:43:18 PM8/5/16
to Confluent Platform
BTW it will be included in the next minor release (i.e. not in the coming CP 3.0.1, but 3.1.0 or the alike).

Robert

unread,
Aug 9, 2016, 5:55:59 AM8/9/16
to Confluent Platform
Sounds great! Can't wait to try it out :) 

Robert

unread,
Sep 30, 2016, 8:11:47 AM9/30/16
to Confluent Platform
I've been trying to configure the interceptors using kafka trunk, but I'm still having issues.

I want to assign a configuration to my interceptor, we've been using this for kafka connect with success, the config there looks like:

(worker.properties)

```
consumer.interceptor.classes=com.appsignal.kafka.StatsdConsumerInterceptor
consumer.interceptor.statsd.host=localhost
```

But when I try to pass an interceptor configuration with kafka streams those variables never make it to the interceptor config.
The only values that are passed to the interceptor from the stream config are the consumerconfig variables defined here: https://www.apache.org/dist/kafka/0.10.0.0/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html

This also doesn't work when I try to set the config programatically:

```
    streamsConfiguration.put("consumer.interceptor.statsd.host", "localhost") 
    # or
    streamsConfiguration.put("interceptor.statsd.host", "localhost")

```
Is there any way to pass variables to the intercepter through the streamsconfig like it works with the connect config?

Damian Guy

unread,
Sep 30, 2016, 11:21:22 AM9/30/16
to Confluent Platform
Hi Robert,

You are correct this currently doesn't work - i'm not sure there is a work around (someone else might know?).
and submitted a PR for it

Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/e243a811-c0b1-46a3-94ed-bde1d55e19fd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Robert

unread,
Oct 2, 2016, 1:54:33 PM10/2/16
to Confluent Platform
Hi Damian,

I just tested my interceptor with the code from the 0.10.1. branch and it works perfectly! 

Thanks for your support!

Best,

Robert.

Damian Guy

unread,
Oct 3, 2016, 4:16:10 AM10/3/16
to Confluent Platform
Hi Robert,
No problems. Thanks for the confirmation.

Cheers,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages