Kafka streams consumer reading from random offset even with auto.offset.reset=latest

874 views
Skip to first unread message

SMJ

unread,
Apr 24, 2018, 7:16:34 PM4/24/18
to Confluent Platform
In our Kafka streams Application, we change the group id and application id every time we run it and we have auto.offset.reset=latest but we're seeing the application reading records from random offsets from the topic. How can we make sure that the Application reads records that come in AFTER it connects to the Kafka topic? 
1) If we're resetting application id and group id every time, will it read from the first available offset (maybe offset 0 if records still exist) or the latest offset that has records coming in AFTER streams application connects to Kafka? 
2) What are the offset tools that give us information about earliest, latest and last committed offset?
3) What we basically want is, we don't want our application to start processing from offset 0. Instead, it should only process the new data coming in. How can we ensure this? 

Matthias J. Sax

unread,
Apr 25, 2018, 6:20:55 AM4/25/18
to confluent...@googlegroups.com
In Kafka Streams, the application.id is used as the group.id -- thus,
it's just one config.

However, using a new application.id and setting
`auto.offset.reset="latest"` should actually do what you want.

Can you share the configs of the application (either code, or from the
logs?)

You can use bin/kafka-consumer-group command to inspect offsets.


-Matthias

On 4/25/18 12:16 AM, SMJ wrote:
> In our Kafka streams Application, we change the group id and application
> id every time we run it and we have *auto.offset.reset=latest *but we're
> seeing the application reading records from random offsets from the
> topic. How can we make sure that the Application reads records that come
> in AFTER it connects to the Kafka topic? 
> 1) If we're resetting application id and group id every time, will it
> read from the first available offset (maybe offset 0 if records still
> exist) or the latest offset that has records coming in AFTER streams
> application connects to Kafka? 
> 2) What are the offset tools that give us information about earliest,
> latest and last committed offset?
> 3) What we basically want is, we don't want our application to start
> processing from offset 0. Instead, it should only process the new data
> coming in. How can we ensure this? 
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/e110f674-b4fa-450a-9d03-77c1fcde6bf7%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/e110f674-b4fa-450a-9d03-77c1fcde6bf7%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

SMJ

unread,
Apr 25, 2018, 2:02:24 PM4/25/18
to Confluent Platform
These are the consumer settings:
heartbeat.interval.ms=3000
session.timeout.ms=120000
max.partition.fetch.bytes=52428800
fetch.min.bytes=1
fetch.max.bytes=2147483640
fetch.max.wait.ms=50
isolation.level=read_uncommitted
auto.offset.reset=latest
enable.auto.commit=true
auto.commit.interval.ms=2500
max.poll.interval.ms=300000
max.poll.records=500000
request.timeout.ms=305000

The streams setting is as follows:
num.stream.threads=30
replication.factor=1
processing.guarantee=at_least_once
commit.interval.ms=30000
poll.ms=100

We confirmed it is reading from random offset by performing 2 runs:
1) For the first run, the input topic showed the following values for earliest and latest offset using the consumer-group tool:

first offset : 422,385
latest offset : 5794823
currently processing offset : 3683629

2) For the second run, the numbers were as such:
first offset : 427, 154
latest offset : 5806004
currently processing offset: 5282839

We got these numbers from this tool:
bin/kafka-consumer-groups --bootstrap-server localhost:9092 --group exampleGroupId --describe


So the latest offset in run1 was 5794823 and we'd expect the run2 to start processing from this number or higher (if new records kept coming into the topic) but run2 started processing from 427,154 which is not what's expected. Please let me know how we can fix this? Since the App starts processing from an early offset, we're seeing a huge lag. We want it to only process new records. 

SMJ

unread,
Apr 25, 2018, 8:17:02 PM4/25/18
to Confluent Platform
Addition to my previous reply, these are the stream logs:
[DEBUG] 2018-04-25 16:40:12,880 org.apache.kafka.clients.consumer.internals.Fetcher debug - [Consumer clientId=offset-check-run4-aa305d5f-895e-430c-b4c6-e0b3e73db305-StreamThread-24-consumer, groupId=offset-check-run4] Fetch READ_UNCOMMITTED at offset 0 for partition test-3 returned fetch data (error=NONE, highWaterMark=5924008, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=52428800)
[DEBUG] 2018-04-25 16:40:25,874 org.apache.kafka.clients.consumer.internals.Fetcher debug - [Consumer clientId=offset-check-run4-aa305d5f-895e-430c-b4c6-e0b3e73db305-StreamThread-24-consumer, groupId=offset-check-run4] Fetch READ_UNCOMMITTED at offset 27570 for partition test-3 returned fetch data (error=NONE, highWaterMark=5924008, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=52428800)
[DEBUG] 2018-04-25 16:40:37,331 org.apache.kafka.clients.consumer.internals.Fetcher debug - [Consumer clientId=offset-check-run4-aa305d5f-895e-430c-b4c6-e0b3e73db305-StreamThread-24-consumer, groupId=check-run4] Fetch READ_UNCOMMITTED at offset 55075 for partition test-3 returned fetch data (error=NONE, highWaterMark=5924008, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=52428800)


and so on.. Like we can see it has the correct highWaterMark which is the last offset but it is still reading from offset 0. (Note: The latest offset values might not match with the other consumer tool values because they were from different runs) 

On Wednesday, April 25, 2018 at 3:20:55 AM UTC-7, Matthias J. Sax wrote:
Message has been deleted

SMJ

unread,
Apr 26, 2018, 1:01:20 PM4/26/18
to Confluent Platform

Adding to the Consumer Configs. This is what I found from the logs (And there are 30 for them created for each stream thread):
[INFO] 2018-04-25 16:27:38,609 org.apache.kafka.clients.consumer.ConsumerConfig logAll - ConsumerConfig values:
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = check-run4-aa305d5f-895e-430c-b4c6-e0b3e73db305-StreamThread-25-consumer
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 2147483640
fetch.min.bytes = 1
group.id = offset-check-run4
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 52428800
max.poll.records = 500000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamPartitionAssignor]
receive.buffer.bytes = 32768
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

On Wednesday, April 25, 2018 at 3:20:55 AM UTC-7, Matthias J. Sax wrote:
Reply all
Reply to author
Forward
0 new messages