i have a simple java kafka stream processing code to parse the JSON message and to a HTTP post. The code structure is given below:
public static void main(String[] args) {
try{
KafkaStreams streams = createStreams(args);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}catch(Exception e){
System.out.println("My exception comments.....");
}
}
public static KafkaStreams createStreams(String[] args) {
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, System.getenv("APPLICATION_ID_CONFIG"));
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv("BOOTSTRAP_SERVERS_CONFIG"));
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, System.getenv("ZOOKEEPER_CONNECT_CONFIG"));
streamsConfiguration.put(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "12000");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class);
final Serde<String> stringSerde = Serdes.String();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(jsonSerializer, jsonDeserializer).getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> nodeRaw = builder.stream(stringSerde, jsonSerde, args);
nodeRaw.process(() -> new AbstractProcessor<String, JsonNode>() {
@Override
public void process(String time, JsonNode jMsg) {
try {
//Parse JSON value and do a HTTP post (fire and forget)
}
}
}
All these exceptions are printing in console and stream thread moving to not working state. (this need to forcefully restart the docker container)
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:612)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] failed to suspend stream tasks
at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:488)
How can i catch these exception in my java code, instead of printing in console?