I have a sink connector deployed to Kafka Connect cluster. This connector processes non-frequent jobs. Messages can come to the topic once in 2 weeks. When no messages come during last 7 days my messages get reprocessed from the beginning of the topic. It is because of my
Map<String, Object> props = new HashMap();
props.put("group.id", SinkUtils.consumerGroupId(this.id.connector())); props.put("bootstrap.servers", Utils.join(this.workerConfig.getList("bootstrap.servers"), ","));
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.putAll(this.workerConfig.originalsWithPrefix("consumer."));
try {
KafkaConsumer<byte[], byte[]> newConsumer = new KafkaConsumer(props);
return newConsumer;
} catch (Throwable var4) {
throw new ConnectException("Failed to create consumer", var4);
}