Streams + Earliest Offset + Max Offset

1,579 views
Skip to first unread message

Mark Shaw

unread,
Apr 17, 2017, 6:27:21 PM4/17/17
to Confluent Platform
Hey all,

I have a simple Kafka Streams setup with the following requirements but can't seem to get this nailed down.

1. On start, always read stream 1 from the earliest offset to recreate an internal state. PROBLEM: auto offset reset = earliest is the only solution I can find but isn't working. When I restart, it only picks up new messages.

2. Know when we have reached the max offset. PROBLEM: Can't track this down in the api (if it exists at all)


object ConsumerStream {

def main(args:Array[String]) = {

val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-consumer-stream-dag");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde].getName)
properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde].getName)
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

val builder = new KStreamBuilder
val stream : KStream[GenericRecord, GenericRecord] = builder.stream("some-topic")

stream.print()

val dag = new KafkaStreams(builder, properties)
dag start
}

}


Any help would be greatly appreciated.

Matthias J. Sax

unread,
Apr 17, 2017, 7:02:06 PM4/17/17
to confluent...@googlegroups.com
Hi,

1. I am not sure why you want to start from the beginning? Streams will
automatically recreate state on restart -- there is no reason for you to
go back manually.

Furthermore, `auto.offset.reset` has different semantics: Cf.
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups-and-offset-management/19390/how-can-i-read-topic-from-its-beginning#t=201704172256023133618

(This holds for Java consumer as well as Streams, because Streams
internally uses Java consumer.)


If you really need to reset your Streams application to "zero", you can
use the Streams reset tool though:
http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool

It's highly recommend to read the corresponding blog post before using
the tool:
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/


2. There is no API for this. And I am wondering why you would need this
in the first place?

What you can to however is, to track the committed offsets of your app.
A Streams app uses its `application.id` as consumer `group.id`. Thus,
you can monitor the lag using `bin/kafka-consumer-groups`


Hope this helps.


-Matthias


On 4/17/17 3:27 PM, Mark Shaw wrote:
> Hey all,
>
> I have a simple Kafka Streams setup with the following requirements but
> can't seem to get this nailed down.
>
> 1. On start, always read *stream 1 *from the earliest offset to recreate
> an internal state. *PROBLEM*: auto offset reset = earliest is the only
> solution I can find but isn't working. When I restart, it only picks up
> new messages.
>
> 2. Know when we have reached the max offset. *PROBLEM:* Can't track this
> down in the api (if it exists at all)
>
>
> objectConsumerStream {
>
> def main(args:Array[String]) = {
>
> val properties = new Properties()
> properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-consumer-stream-dag");
> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde].getName)
> properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde].getName)
> properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
>
> val builder = new KStreamBuilder
> val stream : KStream[GenericRecord, GenericRecord] = builder.stream("some-topic")
>
> stream.print()
>
> val dag = new KafkaStreams(builder, properties)
> dag start
> }
>
> }
>
> **//___^
>
> Any help would be greatly appreciated.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/3676a6a1-8935-49d2-bff2-48e65aa8375a%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/3676a6a1-8935-49d2-bff2-48e65aa8375a%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Mark Shaw

unread,
Apr 18, 2017, 8:22:18 AM4/18/17
to Confluent Platform
Hey Matthias,

Thanks for your feedback and here is some background - we have a reference data stream as an input to a micro-service. We are streaming the entirety of the reference data stream to perform in-memory lookups. To do this however; we need to be able to re-read the entire stream AND know when this is complete so that we can begin processing data from another stream. Hence knowing the max offset is required otherwise we may produce invalid data as the lookups may return null which would alter the behavior of the service.

So with the low level consumer, every time the service starts, you could determine your max offset, seek to 0, then read everything in until you hit your max offset. At this point you know you can begin processing data.

The question being, how do you solve this using Kafka Streams?  


Matthias J. Sax

unread,
Apr 18, 2017, 4:33:05 PM4/18/17
to confluent...@googlegroups.com
I guess, GlobalKTable is your friend for this case.

It will read up to "end-of-log" before any other processing starts.

http://docs.confluent.io/current/streams/concepts.html#streams-concepts-globalktable

Hope this helps :)


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/adecc393-e2dc-41ad-bbd0-1e634981bcf6%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/adecc393-e2dc-41ad-bbd0-1e634981bcf6%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

Mark Shaw

unread,
Apr 28, 2017, 1:47:47 PM4/28/17
to Confluent Platform
Thanks! That ended up being a useful solution.


On Tuesday, April 18, 2017 at 4:33:05 PM UTC-4, Matthias J. Sax wrote:
I guess, GlobalKTable is your friend for this case.

It will read up to "end-of-log" before any other processing starts.

http://docs.confluent.io/current/streams/concepts.html#streams-concepts-globalktable

Hope this helps :)


-Matthias


On 4/18/17 5:22 AM, Mark Shaw wrote:
> Hey Matthias,
>
> Thanks for your feedback and here is some background - we have a
> reference data stream as an input to a micro-service. We are streaming
> the entirety of the reference data stream to perform in-memory lookups.
> To do this however; we need to be able to re-read the entire stream AND
> know when this is complete so that we can begin processing data from
> another stream. Hence knowing the max offset is required otherwise we
> may produce invalid data as the lookups may return null which would
> alter the behavior of the service.
>
> So with the low level consumer, every time the service starts, you could
> determine your max offset, seek to 0, then read everything in until you
> hit your max offset. At this point you know you can begin processing data.
>
> The question being, how do you solve this using Kafka Streams?  
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
Reply all
Reply to author
Forward
0 new messages