kafka stream java API: catching stream/clients exceptions in java code

25 views
Skip to first unread message

jayant...@gmail.com

unread,
Oct 20, 2017, 3:32:15 AM10/20/17
to Confluent Platform
Hi All,

i have a simple java kafka stream processing code to parse the JSON message and to a HTTP post. The code structure is given below: 

public static void main(String[] args) {
try{
KafkaStreams streams = createStreams(args);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}catch(Exception e){
System.out.println("My exception comments.....");
}
}
public static KafkaStreams createStreams(String[] args)  {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, System.getenv("APPLICATION_ID_CONFIG"));
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, System.getenv("ZOOKEEPER_CONNECT_CONFIG"));
streamsConfiguration.put(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "12000");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class); 

    final Serde<String> stringSerde = Serdes.String();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());

    KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> nodeRaw = builder.stream(stringSerde, jsonSerde, args);

nodeRaw.process(() -> new AbstractProcessor<String, JsonNode>() {
@Override
public void process(String time, JsonNode jMsg) {
try {
//Parse JSON value and do a HTTP post (fire and forget)                
}
  }
}

i am running this code as a docker container. 

Query:
I am unable to catch any kafka API stream/client exceptions in my java code. 
All these exceptions are printing in console and stream thread moving to not working state. (this need to forcefully restart the docker container)

Example exceptions: 
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] failed to suspend stream tasks
        at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
.....
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:698)

How can i catch these exception in my java code, instead of printing in console? 

Regards,
Jayanth

Damian Guy

unread,
Oct 20, 2017, 4:34:47 AM10/20/17
to Confluent Platform
Hi,

For any exceptions that aren't handled by streams you can set an uncaught exception handler i.e., KafkaStreams#setUncaughtExceptionHandler
Thanks,
Damian

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9a6b9ee3-6219-4b9c-a1c2-4f7354236d80%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

jayant...@gmail.com

unread,
Oct 21, 2017, 12:55:51 AM10/21/17
to Confluent Platform
Thank you Damian for the help. 
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Michael Noll

unread,
Oct 23, 2017, 6:18:57 AM10/23/17
to confluent...@googlegroups.com
Also:  With the upcoming Kafka version 1.0 (should be released in the next few days), you will also have better support for handling deserialization failures, i.e., what Kafka Streams should do in case your application attempts to read a corrupted JSON message from Kafka.  The final documentation for this new functionality is not yet available (will be with 1.0 release), but in the meantime take a look at [1], which is the design document for that new functionality.

Note that this is really about deserialization related failures -- if your application fails in different areas you still need to use the UncaughtExceptionHandler as Damian suggested, use try-catch, etc.

Best,
Michael
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages