Kafka Connect Sink Connector consumer group configuration

1,123 views
Skip to first unread message

Макс Сахаров

unread,
Jul 9, 2018, 6:57:26 PM7/9/18
to Confluent Platform
I have a sink connector deployed to Kafka Connect cluster. This connector processes non-frequent jobs. Messages can come to the topic once in 2 weeks. When no messages come during last 7 days my messages get reprocessed from the beginning of the topic. It is because of my offsets.retention.minutes is set to 7 days. In other words, if no messages during 7 days were published then offset get dropped and consumer start reads from the beginning. I don't have an ability to change offsets.retention.minutes  on the broker side. I also have to maintain log.retetion.* for the topic higher than offsets.retention.minutes

What I also found in Kafka Connect codebase is that consumer has hardcoded settings for the sink connector. org.apache.kafka.connect.runtime.WorkerSinkTask creates a consumer with next settings 

        Map<String, Object> props = new HashMap();
        props.put("group.id", SinkUtils.consumerGroupId(this.id.connector()));
        props.put("bootstrap.servers", Utils.join(this.workerConfig.getList("bootstrap.servers"), ","));
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.putAll(this.workerConfig.originalsWithPrefix("consumer."));

        try {
            KafkaConsumer<byte[], byte[]> newConsumer = new KafkaConsumer(props);
            return newConsumer;
        } catch (Throwable var4) {
            throw new ConnectException("Failed to create consumer", var4);
        }

Highlighted rows show that we're not doing auto commits+everytime we reset our offsets to earliest. Would be great to have an ability to set auto.offset.reset from the sink connector configuration or even Kafka Cluster configuration.

My Kafka cluster is 0.10 version. Kafka Connect is from docker confluentinc/cp-kafka-connect:4.0.0

Does anybody have the same issue? Is there are any suggestions on how to tackle that?

Макс Сахаров

unread,
Jul 11, 2018, 2:50:04 PM7/11/18
to Confluent Platform
Looks like I missed this part 

props.putAll(this.workerConfig.originalsWithPrefix("consumer."));

I can pass consumer options using 'consumer.' prefix. Which should help me with setting desired consumer configuration. 
Using CONNECT_CONSUMER_AUTO_OFFSET_RESET environment variable you can set the auto offset reset
Reply all
Reply to author
Forward
0 new messages