Kafka Producer transactions incrementing offsets twice for a message

3,507 views
Skip to first unread message

Sampath Kumar

unread,
Jan 11, 2018, 4:05:43 PM1/11/18
to Confluent Platform
We are trying to implement exactly once semantics using transaction. When a message is pushed to topic the offset gets incremented by two instead of one.

ex : my current offset is 10, i am pushing message to topic. my next offset needs to be 11. however next offset is updated as 12. In the consumer side received only 1 message and my consumer group says lag of 1 even though my consumer is still running. This is happening only when we use transactions. When next message is pushed offset moved to 14 and consumer received 1 message only.

We are using Kafka 1.0 for both clients and broker.

my producer configs
@Bean("producerConfig")
public Properties producerConfig() {
LOGGER.info("Creating Dev Producer Configs");
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost
:9092);
configs.put(ProducerConfig.
ACKS_CONFIG, "all");
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.
VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.
ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.
TRANSACTIONAL_ID_CONFIG, "kafka-transactional-example");
return configs;
}

Runnable
package com.homedepot.mm.po.dcm.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

@Component
public class KafkaJavaProducer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJavaProducer.class);

@Autowired
@Qualifier(value = "producerConfig")
Properties producerConfig;

@Value("${kafka.producer.topic}")
String topic;

List<String> payloadList;

public void sendToKafka(List<String> payloadList){
this.payloadList = payloadList;
this.run();
}

@Override
public void run() {

KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions();

try {

producer.beginTransaction();


payloadList.forEach(s -> {

producer.send(new ProducerRecord<String, String>(topic, s));

});

producer.commitTransaction();

}catch (Exception e){
LOGGER.error("Error in sending transaction",e);
producer.abortTransaction();
}finally {
producer.close();
}



}
}

my consumer configs

@Bean("consumerConfig")
    public Properties consumerConfig()
    {    
        LOGGER.info ("Creating Dev Consumer Configs");
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction-test"  );
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        return configs;

    }


Sampath Kumar

unread,
Jan 11, 2018, 4:11:30 PM1/11/18
to Confluent Platform

Matthias J. Sax

unread,
Jan 11, 2018, 8:32:35 PM1/11/18
to confluent...@googlegroups.com
Each time you commit or abort a transaction, a commit/abort marker is
written into the corresponding partitions and requires one offset in the
log. Thus, as you commit after each send, this is expected behavior.

However, I am wondering why you commit after each message? That not
required and has performance implication. Transactions are usually used
to do a multi-message-transaction.

Having said this, it might be sufficient to use idempotent producer if
you don't intend to do multi-message transactions.


-Matthias

On 1/11/18 1:11 PM, Sampath Kumar wrote:
> Initially we tried with Spring transaction and thought issue with Spring
> kafka.
>
> https://stackoverflow.com/questions/48196671/spring-kafka-transaction-duplicate-messages-published-to-topic/48209970?noredirect=1#comment83412903_48209970
>
>
> On Thursday, January 11, 2018 at 4:05:43 PM UTC-5, Sampath Kumar wrote:
>
> We are trying to implement exactly once semantics using transaction.
> When a message is pushed to topic the offset gets incremented by two
> instead of one.
>
> ex : my current offset is 10, i am pushing message to topic. my next
> offset needs to be 11. however next offset is updated as 12. In the
> consumer side received only 1 message and my consumer group says lag
> of 1 even though my consumer is still running. This is happening
> only when we use transactions. When next message is pushed offset
> moved to 14 and consumer received 1 message only.
>
> We are using Kafka 1.0 for both clients and broker.
>
> my producer configs
>
> @Bean("producerConfig")
> public Properties producerConfig() {
> LOGGER.info("Creating Dev Producer Configs");
> Properties configs = new Properties();
> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost*:9092*);
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/6ce07894-be5e-473a-8e54-ee190092a916%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/6ce07894-be5e-473a-8e54-ee190092a916%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Sampath Kumar

unread,
Jan 12, 2018, 10:25:40 AM1/12/18
to Confluent Platform

Mathias,

Our Use case is we receive messages from MQ.  Each message will have list of 50-100 individual records which needs to be posted to kafka topic. We need to ensure all records for the message are sent to kafka  successfully  and send acknowledgment to MQ .  In case of failure, we need to rollback all records for that message and send failure to MQ.

We were trying to implement this using transaction. I understand the transaction itself gets offset in log. Our problem is consumer always has show lags because of this additional offset added for transaction.

Is there a better way to implement our use case other than using transaction?

Thanks


On Thursday, January 11, 2018 at 4:05:43 PM UTC-5, Sampath Kumar wrote:

Matthias J. Sax

unread,
Jan 12, 2018, 4:29:30 PM1/12/18
to confluent...@googlegroups.com
I see. If you get input message from MQ and write many massage to Kafka,
transactions make totally sense to use.

> Our problem is consumer always
>> has show lags because of this additional offset added for transaction.

Not sure if I understand this. Consumer lag should be independent of
transactions. After a transaction commits and a consumer received all
data, it should commit the offset after the commit marker and thus, lag
should not be affected by transactions.

-Matthias
> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost*:9092*);
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/16ae0a1e-1891-4d68-88cc-6c203dec2e17%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/16ae0a1e-1891-4d68-88cc-6c203dec2e17%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Sampath Kumar

unread,
Jan 15, 2018, 1:20:14 PM1/15/18
to Confluent Platform
Mathias,

We figured out autocommit works fine and issue is with manual commit only. Our consumer side, we are manually committing the offsets.  My consumer code is given below. 

#Kafka Consumer Configuration
spring.kafka.consumer.bootstrapservers=localhost:9092
spring.kafka.consumer.concurrentconsumercount=1
spring.kafka.consumer.autooffsetreset=earliest
spring.kafka.consumer.groupid=transaction
spring.kafka.consumer.enableautocommit=false
spring.kafka.consumer.maxpollrecordsconfig=10
spring.kafka.consumer.isolation_level_config=read_committed


public static void main(String[] arg) throws InterruptedException {
context = SpringApplication.run(IPRKafkaConsumer.class);
startConsumers();

}

public static void startConsumers() throws InterruptedException
{
List<KafkaJavaConsumer> consumers = new ArrayList<>();
Integer numConsumer = context.getBean(“concurrentConsumerCount”, Integer.class);
ExecutorService executor = Executors.newFixedThreadPool(numConsumer);

for (int i=0; i<numConsumer; i++) {
KafkaJavaConsumer consumer = context.getBean(KafkaJavaConsumer.class);
consumer.setConsumer(new KafkaConsumer<String, String>(context.getBean(“consumerConfig”,Properties.class)));
consumer.setClientId(“Worker”+i);
consumers.add(consumer);
executor.submit(consumer);
}

}


public class KafkaJavaConsumer implements Runnable  {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJavaConsumer.class);

List<String> topics;

@Value("${kafka.consumer.topic}")
String topic;

private String kafkaConsumerOffset;
Map<TopicPartition, OffsetAndMetadata> offsets;

public void setClientId(String clientId){
this.clientId = clientId;
}

public KafkaConsumer<String, String> getConsumer() {
return consumer;
}

public void setConsumer(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}

private KafkaConsumer<String, String> consumer;
private  String clientId;

public void run() {
topics = new ArrayList<>();
topics.add(topic);
consumer.subscribe(topics);
offsets = new HashMap<TopicPartition, OffsetAndMetadata>();

boolean recordUpdate = true;
try {
while (true) {
ConsumerRecords<String, String> polledRecords = consumer.poll(60000);
LOGGER.info("Size of Records polled " + polledRecords.count());
recordUpdate = processRecords(polledRecords);
}
}catch (Exception e) {
LOGGER.error("Error in polling records",e);
}finally {
consumer.close();
}
}

private boolean processRecords(ConsumerRecords<String, String> polledRecords)
throws IllegalAccessException, InvocationTargetException {
boolean recordUpdate = true;
for (ConsumerRecord<String, String> record : polledRecords) {
try {
recordUpdate = processRecord( record, 1);
} catch (Exception e) {
recordUpdate = false;
break;
}
}
//Seek the remaining records from other partitions, to receive them in next poll
if (!recordUpdate) {
Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
polledRecords.forEach(r ->
offsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()), k -> r.offset()));
offsets.forEach(consumer::seek);
}
    return recordUpdate;

}

private boolean processRecord(ConsumerRecord<String, String> record, int retryCount)
throws IllegalAccessException, InvocationTargetException, InterruptedException {
boolean recordUpdate = true;
try {
kafkaConsumerOffset = record.topic() + ":" + record.partition() + ":" + record.offset();
LOGGER.info("Offset {" + kafkaConsumerOffset + "} Received in Kafka Consumer");
OffsetAndMetadata metadata = new OffsetAndMetadata(record.offset() + 1);
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
offsets.put(partition, metadata);
                         ---business logic written here
                         
consumer.commitAsync(offsets, null);
}finally {
//consumer.commitAsync(offsets, null);
}
return recordUpdate;
}
}

We posted a message to topic with transaction. For testing purposes we are using single message. Offset incremented by 2, 1 for message and 1 for transaction

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic transaction-test
transaction-test:0:6

Pushed another message to same topic with transaction. Offset incremented by 2.

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic transaction-test
transaction-test:0:8

Consumer lag shows 4 when my consumer is down.

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-consumer-groups --bootstrap-server localhost:9092  --describe  --group transaction
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'transaction' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
transaction-test               0          4               8               4          -    

Started the consumer and lag says 1

D25SP18BGQ17IM:confluent-4.0.0 mxp8051$ ./bin/kafka-consumer-groups --bootstrap-server localhost:9092  --describe  --group transaction
Note: This will not show information about old Zookeeper-based consumers.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
transaction-test               0          7               8               1          consumer-1-7a792d01-f3df-4e33-9ccc-636f8110ba38   /172.22.52.231                 consumer-1

Thanks
Sampath Kumar




On Thursday, January 11, 2018 at 4:05:43 PM UTC-5, Sampath Kumar wrote:

jayesh...@springernature.com

unread,
Jul 20, 2018, 6:35:35 AM7/20/18
to Confluent Platform
Hi Sampath/Mathias,

Can you please share more insights how setting auto commit to true solves the problem because in this case it's more of at-most-once delivery instead of exactly once. If commit interval has occurred, in turn triggers Kafka to automatically commit the last used offset. Meanwhile, the processing was not done and consumer has crashed when consumer restarts, it starts to receive messages from the last committed offset. Consumer should commit offset only when transaction is complete. 

We are also facing the same issue, where transactions incrementing offsets twice for a message and showing lag even after consuming all the messages. In our case auto commit is set to false.

It's bit confusing while tracking message logs via describe, is there any way to get rid of lags because of this additional offset added for transaction.

Thanks,
Jayesh

Matthias J. Sax

unread,
Jul 20, 2018, 1:08:48 PM7/20/18
to confluent...@googlegroups.com
Auto-commit is a consumer feature, and if you use Kafka transactions for
a read-processes-write pattern, you would disable auto-commit on the
consumer and user `Producer#sendOffsetsToTransaction()` to commit
offsets instead.

Note, that Sampath receives data from an external system and want to
write it into Kafka. This is a different scenario and one would need to
write custom code to guaranteed end-to-end exactly-once. Kafka
transactions only cover a topic-to-topic read-process-write pattern
out-of-the box.


-Matthias
>  /172.22.52.231 <http://172.22.52.231>                 consumer-1
>
> Thanks
> Sampath Kumar
>
>
>
>
> On Thursday, January 11, 2018 at 4:05:43 PM UTC-5, Sampath Kumar wrote:
>
> We are trying to implement exactly once semantics using
> transaction. When a message is pushed to topic the offset gets
> incremented by two instead of one.
>
> ex : my current offset is 10, i am pushing message to topic. my
> next offset needs to be 11. however next offset is updated as
> 12. In the consumer side received only 1 message and my consumer
> group says lag of 1 even though my consumer is still running.
> This is happening only when we use transactions. When next
> message is pushed offset moved to 14 and consumer received 1
> message only.
>
> We are using Kafka 1.0 for both clients and broker.
>
> my producer configs
>
> @Bean("producerConfig")
> public Properties producerConfig() {
> LOGGER.info("Creating Dev Producer Configs");
> Properties configs = new Properties();
> configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> localhost*:9092*);
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/ad481c09-017d-4b1f-b5b7-05988e7402f1%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/ad481c09-017d-4b1f-b5b7-05988e7402f1%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages