Kafka Producer transactions incrementing offsets twice for a message

Sampath Kumar

Jan 11, 2018, 4:05:43 PM1/11/18
to Confluent Platform
We are trying to implement exactly once semantics using transaction. When a message is pushed to topic the offset gets incremented by two instead of one.

ex : my current offset is 10, i am pushing message to topic. my next offset needs to be 11. however next offset is updated as 12. In the consumer side received only 1 message and my consumer group says lag of 1 even though my consumer is still running. This is happening only when we use transactions. When next message is pushed offset moved to 14 and consumer received 1 message only.

We are using Kafka 1.0 for both clients and broker.

my producer configs
public Properties producerConfig() {
LOGGER.info("Creating Dev Producer Configs");
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost
ACKS_CONFIG, "all");
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
TRANSACTIONAL_ID_CONFIG, "kafka-transactional-example");
return configs;

package com.homedepot.mm.po.dcm.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaJavaProducer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJavaProducer.class);

@Qualifier(value = "producerConfig")
Properties producerConfig;

String topic;

List<String> payloadList;

public void sendToKafka(List<String> payloadList){
this.payloadList = payloadList;

public void run() {

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);

try {


payloadList.forEach(s -> {

producer.send(new ProducerRecord<String, String>(topic, s));



}catch (Exception e){
LOGGER.error("Error in sending transaction",e);
}finally {


my consumer configs

    public Properties consumerConfig()
        LOGGER.info ("Creating Dev Consumer Configs");
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction-test"  );
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        return configs;


Sampath Kumar

Jan 11, 2018, 4:11:30 PM1/11/18
to Confluent Platform

Matthias J. Sax

Jan 11, 2018, 8:32:35 PM1/11/18
to confluent...@googlegroups.com
Each time you commit or abort a transaction, a commit/abort marker is
written into the corresponding partitions and requires one offset in the
log. Thus, as you commit after each send, this is expected behavior.

However, I am wondering why you commit after each message? That not
required and has performance implication. Transactions are usually used
to do a multi-message-transaction.

Having said this, it might be sufficient to use idempotent producer if
you don't intend to do multi-message transactions.


Sampath Kumar

Jan 12, 2018, 10:25:40 AM1/12/18
to Confluent Platform


Our Use case is we receive messages from MQ.  Each message will have list of 50-100 individual records which needs to be posted to kafka topic. We need to ensure all records for the message are sent to kafka  successfully  and send acknowledgment to MQ .  In case of failure, we need to rollback all records for that message and send failure to MQ.

We were trying to implement this using transaction. I understand the transaction itself gets offset in log. Our problem is consumer always has show lags because of this additional offset added for transaction.

Is there a better way to implement our use case other than using transaction?


Matthias J. Sax

Jan 12, 2018, 4:29:30 PM1/12/18
to confluent...@googlegroups.com
I see. If you get input message from MQ and write many massage to Kafka,
transactions make totally sense to use.

> Our problem is consumer always
>> has show lags because of this additional offset added for transaction.

Not sure if I understand this. Consumer lag should be independent of
transactions. After a transaction commits and a consumer received all
data, it should commit the offset after the commit marker and thus, lag
should not be affected by transactions.

> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost*:9092*);
Sampath Kumar

Jan 15, 2018, 1:20:14 PM1/15/18
to Confluent Platform

We figured out autocommit works fine and issue is with manual commit only. Our consumer side, we are manually committing the offsets.  My consumer code is given below. 

#Kafka Consumer Configuration

public static void main(String[] arg) throws InterruptedException {
context = SpringApplication.run(IPRKafkaConsumer.class);


public static void startConsumers() throws InterruptedException
List<KafkaJavaConsumer> consumers = new ArrayList<>();
Integer numConsumer = context.getBean(“concurrentConsumerCount”, Integer.class);
ExecutorService executor = Executors.newFixedThreadPool(numConsumer);

for (int i=0; i<numConsumer; i++) {
KafkaJavaConsumer consumer = context.getBean(KafkaJavaConsumer.class);
consumer.setConsumer(new KafkaConsumer<String, String>(context.getBean(“consumerConfig”,Properties.class)));


public class KafkaJavaConsumer implements Runnable  {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJavaConsumer.class);

List<String> topics;

String topic;

private String kafkaConsumerOffset;
Map<TopicPartition, OffsetAndMetadata> offsets;

public void setClientId(String clientId){
this.clientId = clientId;

public KafkaConsumer<String, String> getConsumer() {
return consumer;

public void setConsumer(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;

private KafkaConsumer<String, String> consumer;
private  String clientId;

public void run() {
topics = new ArrayList<>();
offsets = new HashMap<TopicPartition, OffsetAndMetadata>();

boolean recordUpdate = true;
try {
while (true) {
ConsumerRecords<String, String> polledRecords = consumer.poll(60000);
LOGGER.info("Size of Records polled " + polledRecords.count());
recordUpdate = processRecords(polledRecords);
}catch (Exception e) {
LOGGER.error("Error in polling records",e);
}finally {

private boolean processRecords(ConsumerRecords<String, String> polledRecords)
throws IllegalAccessException, InvocationTargetException {
boolean recordUpdate = true;
for (ConsumerRecord<String, String> record : polledRecords) {
try {
recordUpdate = processRecord( record, 1);
} catch (Exception e) {
recordUpdate = false;
//Seek the remaining records from other partitions, to receive them in next poll
if (!recordUpdate) {
Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
polledRecords.forEach(r ->
offsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()), k -> r.offset()));
    return recordUpdate;


private boolean processRecord(ConsumerRecord<String, String> record, int retryCount)
throws IllegalAccessException, InvocationTargetException, InterruptedException {
boolean recordUpdate = true;
try {
kafkaConsumerOffset = record.topic() + ":" + record.partition() + ":" + record.offset();
LOGGER.info("Offset {" + kafkaConsumerOffset + "} Received in Kafka Consumer");
OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1);
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
offsets.put(partition, metadata);
                         ---business logic written here
consumer.commitAsync(offsets, null);
}finally {
//consumer.commitAsync(offsets, null);
return recordUpdate;

We posted a message to topic with transaction. For testing purposes we are using single message. Offset incremented by 2, 1 for message and 1 for transaction

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic transaction-test

Pushed another message to same topic with transaction. Offset incremented by 2.

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic transaction-test

Consumer lag shows 4 when my consumer is down.

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-consumer-groups --bootstrap-server localhost:9092  --describe  --group transaction
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'transaction' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
transaction-test               0          4               8               4          -    

Started the consumer and lag says 1

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-consumer-groups --bootstrap-server localhost:9092  --describe  --group transaction
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
transaction-test               0          7               8               1          consumer-1-7a792d01-f3df-4e33-9ccc-636f8110ba38   /                 consumer-1

Sampath Kumar

Jul 20, 2018, 6:35:35 AM7/20/18
to Confluent Platform
Hi Sampath/Mathias,

Can you please share more insights how setting auto commit to true solves the problem because in this case it's more of at-most-once delivery instead of exactly once. If commit interval has occurred, in turn triggers Kafka to automatically commit the last used offset. Meanwhile, the processing was not done and consumer has crashed when consumer restarts, it starts to receive messages from the last committed offset. Consumer should commit offset only when transaction is complete. 

We are also facing the same issue, where transactions incrementing offsets twice for a message and showing lag even after consuming all the messages. In our case auto commit is set to false.

It's bit confusing while tracking message logs via describe, is there any way to get rid of lags because of this additional offset added for transaction.


Matthias J. Sax

Jul 20, 2018, 1:08:48 PM7/20/18
to confluent...@googlegroups.com
Auto-commit is a consumer feature, and if you use Kafka transactions for
a read-processes-write pattern, you would disable auto-commit on the
consumer and user `Producer#sendOffsetsToTransaction()` to commit
offsets instead.

Note, that Sampath receives data from an external system and want to
write it into Kafka. This is a different scenario and one would need to
write custom code to guaranteed end-to-end exactly-once. Kafka
transactions only cover a topic-to-topic read-process-write pattern
out-of-the box.

