Question on UncaughtHandlerException, "bug" in the stream.close() ?

403 views
Skip to first unread message

Simon Teles

unread,
Feb 26, 2017, 6:30:59 PM2/26/17
to Confluent Platform
Hello,

I just updated to 0.10.2 to resolve the stream not closing properly : https://groups.google.com/forum/#!topic/confluent-platform/uPTSwvMGXd0

It's better but...

The code i use is this one :

public static void main(String[] args) {

Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-ks-1");
properties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor");
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,"1");
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(),
"test.ks.input")
.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
Integer.parseInt(value);
return new KeyValue<String,String>(key, value);
}
})
.to(Serdes.String(), Serdes.String(), "test.ks.output");
KafkaStreams streams = new KafkaStreams(builder, properties);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread arg0, Throwable arg1) {
arg1.printStackTrace();
streams.close();
}
});
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close) {
});
}

So, i use the setUncaughtExceptionHandler to do some stuff, and close the stream... It's not working.

Log :
10:19:02.015 INFO  [] [o.a.k.s.p.i.StreamTask|initTopology|l.333] ~~ task [0_0] Initializing processor nodes of the topology
10:19:02.041 ERROR [] [o.a.k.s.p.i.StreamThread|run|l.376] ~~ stream-thread [StreamThread-1] Streams application error during processing: 
java.lang.NumberFormatException: For input string: "test"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_121]
at java.lang.Integer.parseInt(Integer.java:580) ~[na:1.8.0_121]
at java.lang.Integer.parseInt(Integer.java:615) ~[na:1.8.0_121]
at TestKsStop$1.apply(TestKsStop.java:26) ~[bin/:na]
at TestKsStop$1.apply(TestKsStop.java:1) ~[bin/:na]
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) ~[kafka-streams-0.10.2.0.jar:na]
10:19:02.041 INFO  [] [o.a.k.s.p.i.StreamThread|shutdown|l.397] ~~ stream-thread [StreamThread-1] Shutting down
10:19:02.042 INFO  [] [o.a.k.s.p.i.StreamThread|apply|l.1045] ~~ stream-thread [StreamThread-1] Closing a task 0_0
10:19:02.045 INFO  [] [o.a.k.s.p.i.StreamThread|apply|l.544] ~~ stream-thread [StreamThread-1] Flushing state stores of task 0_0
10:19:02.045 INFO  [] [o.a.k.s.p.i.StreamThread|apply|l.523] ~~ stream-thread [StreamThread-1] Closing the state manager of task 0_0
10:19:02.045 INFO  [] [o.a.k.c.p.KafkaProducer|close|l.689] ~~ Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
10:19:02.051 INFO  [] [o.a.k.s.p.i.StreamThread|removeStreamTasks|l.1019] ~~ stream-thread [StreamThread-1] Removing all active tasks [[0_0]]
10:19:02.051 INFO  [] [o.a.k.s.p.i.StreamThread|removeStandbyTasks|l.1034] ~~ stream-thread [StreamThread-1] Removing all standby tasks [[]]
10:19:02.051 INFO  [] [o.a.k.s.p.i.StreamThread|shutdown|l.427] ~~ stream-thread [StreamThread-1] Stream thread shutdown complete
10:19:02.051 WARN  [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from RUNNING to NOT_RUNNING
java.lang.NumberFormatException: For input string: "test"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at TestKsStop$1.apply(TestKsStop.java:26)
at TestKsStop$1.apply(TestKsStop.java:1)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
10:19:02.052 INFO  [] [o.a.k.s.p.i.StreamThread|close|l.387] ~~ stream-thread [StreamThread-1] Informed thread to shut down
10:19:02.053 WARN  [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN

And its stays like this.

But, if i comment the streams.close() on the setUncaughtExceptionHandler... it's working !

Log :
0:24:35.080 INFO  [] [o.a.k.s.p.i.StreamTask|initTopology|l.333] ~~ task [0_0] Initializing processor nodes of the topology
10:24:35.107 ERROR [] [o.a.k.s.p.i.StreamThread|run|l.376] ~~ stream-thread [StreamThread-1] Streams application error during processing: 
java.lang.NumberFormatException: For input string: "test"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_121]
at java.lang.Integer.parseInt(Integer.java:580) ~[na:1.8.0_121]
at java.lang.Integer.parseInt(Integer.java:615) ~[na:1.8.0_121]
at TestKsStop$1.apply(TestKsStop.java:26) ~[bin/:na]
at TestKsStop$1.apply(TestKsStop.java:1) ~[bin/:na]
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641) ~[kafka-streams-0.10.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) ~[kafka-streams-0.10.2.0.jar:na]
10:24:35.108 INFO  [] [o.a.k.s.p.i.StreamThread|shutdown|l.397] ~~ stream-thread [StreamThread-1] Shutting down
10:24:35.108 INFO  [] [o.a.k.s.p.i.StreamThread|apply|l.1045] ~~ stream-thread [StreamThread-1] Closing a task 0_0
10:24:35.110 INFO  [] [o.a.k.s.p.i.StreamThread|apply|l.544] ~~ stream-thread [StreamThread-1] Flushing state stores of task 0_0
10:24:35.111 INFO  [] [o.a.k.s.p.i.StreamThread|apply|l.523] ~~ stream-thread [StreamThread-1] Closing the state manager of task 0_0
10:24:35.111 INFO  [] [o.a.k.c.p.KafkaProducer|close|l.689] ~~ Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
10:24:35.117 INFO  [] [o.a.k.s.p.i.StreamThread|removeStreamTasks|l.1019] ~~ stream-thread [StreamThread-1] Removing all active tasks [[0_0]]
10:24:35.117 INFO  [] [o.a.k.s.p.i.StreamThread|removeStandbyTasks|l.1034] ~~ stream-thread [StreamThread-1] Removing all standby tasks [[]]
10:24:35.117 INFO  [] [o.a.k.s.p.i.StreamThread|shutdown|l.427] ~~ stream-thread [StreamThread-1] Stream thread shutdown complete
10:24:35.117 WARN  [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from RUNNING to NOT_RUNNING
java.lang.NumberFormatException: For input string: "test"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at TestKsStop$1.apply(TestKsStop.java:26)
at TestKsStop$1.apply(TestKsStop.java:1)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
10:24:35.119 INFO  [] [o.a.k.s.p.i.StreamThread|close|l.387] ~~ stream-thread [StreamThread-1] Informed thread to shut down
10:24:35.120 WARN  [] [o.a.k.s.p.i.StreamThread|setState|l.160] ~~ Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN
10:24:35.120 INFO  [] [o.a.k.s.KafkaStreams|run|l.488] ~~ Stopped Kafka Streams process.

Is this the way it is suppose to work ?

I think the stream should not pass to the state PENDING_SHUTDOWN if the state is already NOT_RUNNING... Something like this :

    /**
     * Shutdown this stream thread.
     */
    public synchronized void close() {
       if (state == State.NOT_RUNNING) {
        log.info("{} Stream already not running, close not needed",logPrefix);
       }else {
         log.info("{} Informed thread to shut down", logPrefix);
         setState(State.PENDING_SHUTDOWN);
        }
    }


Simon Teles

unread,
Feb 27, 2017, 4:13:21 PM2/27/17
to Confluent Platform
No one ? :)

Eno Thereska

unread,
Feb 27, 2017, 4:42:44 PM2/27/17
to Confluent Platform
Hi Simon,

Sounds like a bug. Would you mind filing a JIRA (copy-pasting the above suffices)? Meanwhile the workaround is not to call close() in the handler, as you have already discovered.

Thanks
Eno

Eno Thereska

unread,
Feb 27, 2017, 5:07:49 PM2/27/17
to Confluent Platform
Actually Matthias pointed out there is a Jira already: https://issues.apache.org/jira/browse/KAFKA-4787

Thanks
Eno

Damian Guy

unread,
Feb 28, 2017, 3:18:24 AM2/28/17
to Confluent Platform
You can use the close method with a timeout. This will avoid the deadlock, but it won't be able to shutdown the thread that caused the issue (as it already holds the lock).

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/d56ce7ca-c29f-4bea-b443-e041855759a0%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages