Hi,
1. I am not sure why you want to start from the beginning? Streams will
automatically recreate state on restart -- there is no reason for you to
go back manually.
Furthermore, `auto.offset.reset` has different semantics: Cf.
http://stackoverflow.com/documentation/apache-kafka/5449/consumer-groups-and-offset-management/19390/how-can-i-read-topic-from-its-beginning#t=201704172256023133618
(This holds for Java consumer as well as Streams, because Streams
internally uses Java consumer.)
If you really need to reset your Streams application to "zero", you can
use the Streams reset tool though:
http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool
It's highly recommend to read the corresponding blog post before using
the tool:
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
2. There is no API for this. And I am wondering why you would need this
in the first place?
What you can to however is, to track the committed offsets of your app.
A Streams app uses its `
application.id` as consumer `
group.id`. Thus,
you can monitor the lag using `bin/kafka-consumer-groups`
Hope this helps.
-Matthias
On 4/17/17 3:27 PM, Mark Shaw wrote:
> Hey all,
>
> I have a simple Kafka Streams setup with the following requirements but
> can't seem to get this nailed down.
>
> 1. On start, always read *stream 1 *from the earliest offset to recreate
> an internal state. *PROBLEM*: auto offset reset = earliest is the only
> solution I can find but isn't working. When I restart, it only picks up
> new messages.
>
> 2. Know when we have reached the max offset. *PROBLEM:* Can't track this
> down in the api (if it exists at all)
>
>
> objectConsumerStream {
>
> def main(args:Array[String]) = {
>
> val properties = new Properties()
> properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-consumer-stream-dag");
> properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde].getName)
> properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde].getName)
> properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
> properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
http://localhost:8081");
>
> val builder = new KStreamBuilder
> val stream : KStream[GenericRecord, GenericRecord] = builder.stream("some-topic")
>
> stream.print()
>
> val dag = new KafkaStreams(builder, properties)
> dag start
> }
>
> }
>
> **//___^
>
> Any help would be greatly appreciated.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to
confluent-platf...@googlegroups.com
> <mailto:
confluent-platf...@googlegroups.com>.
> To post to this group, send email to
confluent...@googlegroups.com
> <mailto:
confluent...@googlegroups.com>.
> To view this discussion on the web visit
>
https://groups.google.com/d/msgid/confluent-platform/3676a6a1-8935-49d2-bff2-48e65aa8375a%40googlegroups.com
> <
https://groups.google.com/d/msgid/confluent-platform/3676a6a1-8935-49d2-bff2-48e65aa8375a%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit
https://groups.google.com/d/optout.