[KafkaStream 0.10.1.0] Impossible to close the stream properly, bug ?

106 views
Skip to first unread message

Simon Teles

unread,
Feb 20, 2017, 9:19:40 PM2/20/17
to Confluent Platform
Hello,

I'm trying to test a simple task : closing the stream when an UnhandledException is caught. And it does't work :/

The code i use :

public class TestKsStop {

public static void main(String[] args) {

Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.master-sln:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-ks-1");
properties.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor");
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,"1");
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(),
"test.ks.input")
.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
Integer.parseInt(value);
return new KeyValue<String,String>(key, value);
}
})
.to(Serdes.String(), Serdes.String(), "test.ks.output");
KafkaStreams streams = new KafkaStreams(builder, properties);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread arg0, Throwable arg1) {
arg1.printStackTrace();
streams.close();
}
});
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close) {
});
}
}

I produce a NumberFormatException with the Integer.parseInt(value); on purpose.
It's properly catched by the UncaughtExceptionHandler but the stream doesn't fully shutdown.

The log :
13:00:03.195 ERROR [o.a.k.s.p.i.StreamThread|run|l.249] ~~ stream-thread [StreamThread-1] Streams application error during processing: 
java.lang.NumberFormatException: For input string: "test"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_121]
at java.lang.Integer.parseInt(Integer.java:580) ~[na:1.8.0_121]
at java.lang.Integer.parseInt(Integer.java:615) ~[na:1.8.0_121]
at TestKsStop$1.apply(TestKsStop.java:26) ~[bin/:na]
at TestKsStop$1.apply(TestKsStop.java:1) ~[bin/:na]
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) ~[kafka-streams-0.10.1.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) ~[kafka-streams-0.10.1.1.jar:na]
13:00:03.195 INFO  [o.a.k.s.p.i.StreamThread|shutdown|l.268] ~~ stream-thread [StreamThread-1] Shutting down
13:00:03.195 INFO  [o.a.k.s.p.i.StreamThread|apply|l.358] ~~ stream-thread [StreamThread-1] Committing consumer offsets of task 0_0
13:00:03.195 INFO  [o.a.k.s.p.i.StreamThread|apply|l.751] ~~ stream-thread [StreamThread-1] Closing a task 0_0
13:00:03.195 DEBUG [o.a.k.s.p.i.StreamTask|close|l.328] ~~ task [0_0] Closing processor topology
13:00:03.196 INFO  [o.a.k.s.p.i.StreamThread|apply|l.368] ~~ stream-thread [StreamThread-1] Flushing state stores of task 0_0
13:00:03.196 INFO  [o.a.k.s.p.i.StreamThread|apply|l.347] ~~ stream-thread [StreamThread-1] Closing the state manager of task 0_0
13:00:03.196 DEBUG [o.a.k.c.c.KafkaConsumer|unsubscribe|l.885] ~~ Unsubscribed all topics or patterns and assigned partitions
13:00:03.196 INFO  [o.a.k.c.p.KafkaProducer|close|l.685] ~~ Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
13:00:03.196 DEBUG [o.a.k.c.p.i.Sender|run|l.141] ~~ Beginning shutdown of Kafka producer I/O thread, sending remaining records.
13:00:03.197 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name connections-closed:
13:00:03.197 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name connections-created:
13:00:03.197 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-sent-received:
13:00:03.197 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-sent:
13:00:03.197 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-received:
13:00:03.198 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name select-time:
13:00:03.198 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name io-time:
13:00:03.198 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node--1.bytes-sent
13:00:03.198 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node--1.bytes-received
13:00:03.198 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node--1.latency
13:00:03.198 DEBUG [o.a.k.c.p.i.Sender|run|l.164] ~~ Shutdown of Kafka producer I/O thread has completed.
13:00:03.199 DEBUG [o.a.k.c.p.KafkaProducer|close|l.727] ~~ The Kafka producer has closed.
13:00:03.200 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name connections-closed:
13:00:03.200 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name connections-created:
13:00:03.200 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-sent-received:
13:00:03.200 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-sent:
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-received:
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name select-time:
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name io-time:
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node--1.bytes-sent
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node--1.bytes-received
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node--1.latency
13:00:03.201 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node-2147482646.bytes-sent
13:00:03.202 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node-2147482646.bytes-received
13:00:03.202 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node-2147482646.latency
13:00:03.202 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node-1001.bytes-sent
13:00:03.202 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node-1001.bytes-received
13:00:03.202 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name node-1001.latency
13:00:03.202 DEBUG [o.a.k.c.c.KafkaConsumer|close|l.1505] ~~ The Kafka consumer has closed.
13:00:03.202 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name connections-closed:
13:00:03.203 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name connections-created:
13:00:03.203 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-sent-received:
13:00:03.203 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-sent:
13:00:03.203 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name bytes-received:
13:00:03.203 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name select-time:
13:00:03.203 DEBUG [o.a.k.c.m.Metrics|removeSensor|l.315] ~~ Removed sensor with name io-time:
13:00:03.203 DEBUG [o.a.k.c.c.KafkaConsumer|close|l.1505] ~~ The Kafka consumer has closed.
13:00:03.203 INFO  [o.a.k.s.p.i.StreamThread|removeStreamTasks|l.725] ~~ stream-thread [StreamThread-1] Removing all active tasks [[0_0]]
13:00:03.204 INFO  [o.a.k.s.p.i.StreamThread|removeStandbyTasks|l.740] ~~ stream-thread [StreamThread-1] Removing all standby tasks [[]]
13:00:03.204 INFO  [o.a.k.s.p.i.StreamThread|shutdown|l.292] ~~ stream-thread [StreamThread-1] Stream thread shutdown complete
java.lang.NumberFormatException: For input string: "test"13:00:03.204 DEBUG [o.a.k.s.KafkaStreams|close|l.218] ~~ Stopping Kafka Stream process

at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at TestKsStop$1.apply(TestKsStop.java:26)
at TestKsStop$1.apply(TestKsStop.java:1)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

And it stays like this. The process never finished and the log Stopped Kafka Stream process never appears (from KafkaStreams:212

Any idea why ?

Simon Teles

unread,
Feb 20, 2017, 9:26:41 PM2/20/17
to Confluent Platform
The thread dump :

2017-02-21 13:26:02
Full thread dump OpenJDK 64-Bit Server VM (25.121-b13 mixed mode):

"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x00007f5724011000 nid=0x64e waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"StreamThread-1" #11 prio=5 os_prio=0 tid=0x00007f5724896800 nid=0x67a in Object.wait() [0x00007f56f4c06000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ff620000> (a org.apache.kafka.streams.processor.internals.StreamThread)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x00000000ff620000> (a org.apache.kafka.streams.processor.internals.StreamThread)
at java.lang.Thread.join(Thread.java:1323)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:227)
- locked <0x00000000ff6eb1c0> (a org.apache.kafka.streams.KafkaStreams)
at TestKsStop$2.uncaughtException(TestKsStop.java:38)
at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)

"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007f57240ca000 nid=0x664 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007f57240c5000 nid=0x663 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f57240c0800 nid=0x662 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f57240bf000 nid=0x661 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f57240bc000 nid=0x660 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f57240ba000 nid=0x65f waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f5724092800 nid=0x65e in Object.wait() [0x00007f56f6ceb000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ff6107f8> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000ff6107f8> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f572408e000 nid=0x65d in Object.wait() [0x00007f56f6dec000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ff600b88> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000ff600b88> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"VM Thread" os_prio=0 tid=0x00007f5724086800 nid=0x65c runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f5724026000 nid=0x653 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f5724027800 nid=0x654 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f5724029800 nid=0x655 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f572402b000 nid=0x656 runnable 

"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007f572402d000 nid=0x657 runnable 

"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007f572402e800 nid=0x658 runnable 

"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007f5724030800 nid=0x659 runnable 

"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007f5724032000 nid=0x65a runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f57240cc800 nid=0x666 waiting on condition 

JNI global references: 253

Heap
 PSYoungGen      total 38400K, used 8076K [0x00000000fd580000, 0x0000000100000000, 0x0000000100000000)
  eden space 33280K, 8% used [0x00000000fd580000,0x00000000fd864718,0x00000000ff600000)
  from space 5120K, 99% used [0x00000000ff600000,0x00000000ffafeb20,0x00000000ffb00000)
  to   space 5120K, 0% used [0x00000000ffb00000,0x00000000ffb00000,0x0000000100000000)
 ParOldGen       total 87552K, used 731K [0x00000000f8000000, 0x00000000fd580000, 0x00000000fd580000)
  object space 87552K, 0% used [0x00000000f8000000,0x00000000f80b6ce8,0x00000000fd580000)
 Metaspace       used 12199K, capacity 12438K, committed 12544K, reserved 1060864K
  class space    used 1459K, capacity 1509K, committed 1536K, reserved 1048576K

Matthias J. Sax

unread,
Feb 21, 2017, 12:22:23 AM2/21/17
to confluent...@googlegroups.com
I assume it's this bug: https://issues.apache.org/jira/browse/KAFKA-4366

It got fixed for 0.10.2 that will be releases this or next week.


-Matthias
> log*Stopped Kafka Stream process *never appears (from KafkaStreams:212
>
> Any idea why ?
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/bacdc3b1-862a-4e08-bf15-df971e11352b%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/bacdc3b1-862a-4e08-bf15-df971e11352b%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Simon Teles

unread,
Feb 21, 2017, 12:26:43 AM2/21/17
to confluent...@googlegroups.com
Oh thanks for the answer !

My bad for not checking jira before, won't happen again...

Thanks ;)

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/uPTSwvMGXd0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/48622ec8-35b9-c51e-5386-c7b49b0cf4ee%40confluent.io.
Reply all
Reply to author
Forward
0 new messages