public static void main(String[] args) {
Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-ks-1"); properties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor"); properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,"1"); KStreamBuilder builder = new KStreamBuilder(); builder.stream(Serdes.String(), Serdes.String(), "test.ks.input") .map(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, String value) { Integer.parseInt(value); return new KeyValue<String,String>(key, value); } }) .to(Serdes.String(), Serdes.String(), "test.ks.output"); KafkaStreams streams = new KafkaStreams(builder, properties); streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread arg0, Throwable arg1) { arg1.printStackTrace(); streams.close(); } }); streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close) { }); }
10:19:02.015 INFO [] [o.a.k.s.p.i.StreamTask|initTopology|l.333] ~~ task [0_0] Initializing processor nodes of the topology10:19:02.041 ERROR [] [o.a.k.s.p.i.StreamThread|run|l.376] ~~ stream-thread [StreamThread-1] Streams application error during processing: java.lang.NumberFormatException: For input string: "test" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_121] at java.lang.Integer.parseInt(Integer.java:580) ~[na:1.8.0_121] at java.lang.Integer.parseInt(Integer.java:615) ~[na:1.8.0_121] at TestKsStop$1.apply(TestKsStop.java:26) ~[bin/:na] at TestKsStop$1.apply(TestKsStop.java:1) ~[bin/:na] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) ~[kafka-streams-0.10.2.0.jar:na]10:19:02.041 INFO [] [o.a.k.s.p.i.StreamThread|shutdown|l.397] ~~ stream-thread [StreamThread-1] Shutting down10:19:02.042 INFO [] [o.a.k.s.p.i.StreamThread|apply|l.1045] ~~ stream-thread [StreamThread-1] Closing a task 0_010:19:02.045 INFO [] [o.a.k.s.p.i.StreamThread|apply|l.544] ~~ stream-thread [StreamThread-1] Flushing state stores of task 0_010:19:02.045 INFO [] [o.a.k.s.p.i.StreamThread|apply|l.523] ~~ stream-thread [StreamThread-1] Closing the state manager of task 0_010:19:02.045 INFO [] [o.a.k.c.p.KafkaProducer|close|l.689] ~~ Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.10:19:02.051 INFO [] [o.a.k.s.p.i.StreamThread|removeStreamTasks|l.1019] ~~ stream-thread [StreamThread-1] Removing all active tasks [[0_0]]10:19:02.051 INFO [] [o.a.k.s.p.i.StreamThread|removeStandbyTasks|l.1034] ~~ stream-thread [StreamThread-1] Removing all standby tasks [[]]10:19:02.051 INFO [] [o.a.k.s.p.i.StreamThread|shutdown|l.427] ~~ stream-thread [StreamThread-1] Stream thread shutdown complete10:19:02.051 WARN [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from RUNNING to NOT_RUNNINGjava.lang.NumberFormatException: For input string: "test" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at TestKsStop$1.apply(TestKsStop.java:26) at TestKsStop$1.apply(TestKsStop.java:1) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)10:19:02.052 INFO [] [o.a.k.s.p.i.StreamThread|close|l.387] ~~ stream-thread [StreamThread-1] Informed thread to shut down10:19:02.053 WARN [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN
0:24:35.080 INFO [] [o.a.k.s.p.i.StreamTask|initTopology|l.333] ~~ task [0_0] Initializing processor nodes of the topology10:24:35.107 ERROR [] [o.a.k.s.p.i.StreamThread|run|l.376] ~~ stream-thread [StreamThread-1] Streams application error during processing: java.lang.NumberFormatException: For input string: "test" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_121] at java.lang.Integer.parseInt(Integer.java:580) ~[na:1.8.0_121] at java.lang.Integer.parseInt(Integer.java:615) ~[na:1.8.0_121] at TestKsStop$1.apply(TestKsStop.java:26) ~[bin/:na] at TestKsStop$1.apply(TestKsStop.java:1) ~[bin/:na] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) ~[kafka-streams-0.10.2.0.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) ~[kafka-streams-0.10.2.0.jar:na]10:24:35.108 INFO [] [o.a.k.s.p.i.StreamThread|shutdown|l.397] ~~ stream-thread [StreamThread-1] Shutting down10:24:35.108 INFO [] [o.a.k.s.p.i.StreamThread|apply|l.1045] ~~ stream-thread [StreamThread-1] Closing a task 0_010:24:35.110 INFO [] [o.a.k.s.p.i.StreamThread|apply|l.544] ~~ stream-thread [StreamThread-1] Flushing state stores of task 0_010:24:35.111 INFO [] [o.a.k.s.p.i.StreamThread|apply|l.523] ~~ stream-thread [StreamThread-1] Closing the state manager of task 0_010:24:35.111 INFO [] [o.a.k.c.p.KafkaProducer|close|l.689] ~~ Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.10:24:35.117 INFO [] [o.a.k.s.p.i.StreamThread|removeStreamTasks|l.1019] ~~ stream-thread [StreamThread-1] Removing all active tasks [[0_0]]10:24:35.117 INFO [] [o.a.k.s.p.i.StreamThread|removeStandbyTasks|l.1034] ~~ stream-thread [StreamThread-1] Removing all standby tasks [[]]10:24:35.117 INFO [] [o.a.k.s.p.i.StreamThread|shutdown|l.427] ~~ stream-thread [StreamThread-1] Stream thread shutdown complete10:24:35.117 WARN [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from RUNNING to NOT_RUNNINGjava.lang.NumberFormatException: For input string: "test" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at TestKsStop$1.apply(TestKsStop.java:26) at TestKsStop$1.apply(TestKsStop.java:1) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)10:24:35.119 INFO [] [o.a.k.s.p.i.StreamThread|close|l.387] ~~ stream-thread [StreamThread-1] Informed thread to shut down10:24:35.120 WARN [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN10:24:35.120 INFO [] [o.a.k.s.KafkaStreams|run|l.488] ~~ Stopped Kafka Streams process.
Thanks
Eno
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/d56ce7ca-c29f-4bea-b443-e041855759a0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.