Start Kafka streams from a specific offset

1,576 views
Skip to first unread message

peter...@gmail.com

unread,
Sep 1, 2017, 5:13:43 AM9/1/17
to Confluent Platform
Hi,

Is it possible to start the streaming application from a custom offset instead of from either "earliest" of "latest"?
My idea is to persist the offset and payload's timestamp pairs on a daily basis. When we need to reprocess records from
day X we can just use those offsets as starting points. The retention in our input Kafka topic is large enough so offsets
won't get invalidated frequently.
Is this the right way to achieve such "partial" reprocessing?

Thanks,
Peter

Matthias J. Sax

unread,
Sep 1, 2017, 1:08:03 PM9/1/17
to confluent...@googlegroups.com
It's possible using the new feature of the bin/kafka-consumer-groups tool.

Cf.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

You would need to stop your Kafka Streams application, us the tool to
set the corresponding offsets (note, application.id == group.id), and
afterwards restart your Kafka Streams application. It will pick up the
committed offset as set by the tool on startup.

Note, that this might not result in a consistent state of your
application though, as your application would reuse it's current state
and the state will not be reset to the corresponding point (resetting
the state to a point back in time is not possible atm).

Maybe, your application semantics is resilient to this and it's just
fine for your application. If not, an alternative would be to wipe out
the state completely, using bin/kafka-stream-application-reset (together
with KafkaStreams#cleanup()) and restart your application with an empty
state to do the reprocessing. You will still need to use
bin/kafka-consumer-groups to set the required start offsets.

Which approach is better, depends on your application semantics.


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/b7c819cd-682d-4d12-80dc-8f43ceb724b4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/b7c819cd-682d-4d12-80dc-8f43ceb724b4%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

peter...@gmail.com

unread,
Sep 5, 2017, 11:46:18 AM9/5/17
to Confluent Platform
Hi Matthias,

Thanks for your help.
Are you referring to the changelog topic in the last section of your answer?
Would it be an option to keep track of the offset - timestamp pairs for the changelog topic as well and
set back if needed? Would then the local store be rebuilt until this offset?

Thanks,
Peter

Matthias J. Sax

unread,
Sep 5, 2017, 12:51:53 PM9/5/17
to confluent...@googlegroups.com
Yes. It's about the changelog.

Note, changelogs is stored in compacted topics. Thus, even you can
figure out the correct offsets, _old_ data will get deleted via
compaction on the broker side. It might be possible, to disable log
compaction, but you cannot just apply log retention as this might result
in data loss. If you set retention time to infinite to avoid data loss,
you get an topic that grows unbounded on the other hand. So you are in
"bad shape" for each case...

Also, it's a quite hard problem to figure out the correct offsets within
the changelog topic in the first place.

-Matthias


On 9/5/17 8:46 AM, peter...@gmail.com wrote:
> Hi Matthias,
>
> Thanks for your help.
> Are you referring to the changelog topic in the last section of your answer?
> Would it be an option to keep track of the offset - timestamp pairs for
> the changelog topic as well and
> set back if needed? Would then the local store be rebuilt until this offset?
>
> Thanks,
> Peter
>
> 2017. szeptember 1., péntek 19:08:03 UTC+2 időpontban Matthias J. Sax a
> következőt írta:
>
> It's possible using the new feature of the bin/kafka-consumer-groups
> tool.
>
> Cf.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling>
>
>
> You would need to stop your Kafka Streams application, us the tool to
> set the corresponding offsets (note, application.id
> <http://application.id> == group.id <http://group.id>), and
> afterwards restart your Kafka Streams application. It will pick up the
> committed offset as set by the tool on startup.
>
> Note, that this might not result in a consistent state of your
> application though, as your application would reuse it's current state
> and the state will not be reset to the corresponding point (resetting
> the state to a point back in time is not possible atm).
>
> Maybe, your application semantics is resilient to this and it's just
> fine for your application. If not, an alternative would be to wipe out
> the state completely, using bin/kafka-stream-application-reset
> (together
> with KafkaStreams#cleanup()) and restart your application with an empty
> state to do the reprocessing. You will still need to use
> bin/kafka-consumer-groups to set the required start offsets.
>
> Which approach is better, depends on your application semantics.
>
>
> -Matthias
>
> On 9/1/17 2:13 AM, peter...@gmail.com <javascript:> wrote:
> > Hi,
> >
> > Is it possible to start the streaming application from a custom
> offset
> > instead of from either "earliest" of "latest"?
> > My idea is to persist the offset and payload's timestamp pairs on a
> > daily basis. When we need to reprocess records from
> > day X we can just use those offsets as starting points. The
> retention in
> > our input Kafka topic is large enough so offsets
> > won't get invalidated frequently.
> > Is this the right way to achieve such "partial" reprocessing?
> >
> > Thanks,
> > Peter
> >
> > --
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/ce0fc587-ace4-4b6e-be83-a8a322a24ca9%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/ce0fc587-ace4-4b6e-be83-a8a322a24ca9%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

peter...@gmail.com

unread,
Sep 6, 2017, 8:24:07 AM9/6/17
to Confluent Platform
Thanks Matthias for the valuable input.
--Peter
Reply all
Reply to author
Forward
0 new messages