Well.. I think a read Guillermo's message too fast :-)
It works now!
Here is my working code :
package fr.ludovicmartin.openheatflow;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
public class Application {
public static void main(String[] args) throws IOException {
//Stream configuration
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "zouzou");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "pi1.home.XXXX:9092");
prop.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "pi1.home.XXXX:2181");
prop.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Start streaming
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("test");
stream.foreach((k, v) -> {
System.out.println(v);
});
KafkaStreams streams = new KafkaStreams(builder, prop);
streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
System.out.println(e);
});
streams.start();
new BufferedReader(new InputStreamReader(System.in)).readLine();
streams.close();
}
}
Many thanks to everyone for the help!
Ludovic