Hi,
I am trying to write a Kafka Stream Simple program WordCount.
I have running Zookeeper(3.5.2) and Kafka(0.10.1) in my local.
Now my java code is as follow:
public class WordCountJob {
public static void main(String[] args) throws Exception {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "test");
KStream<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("test")
.toStream();
wordCounts.to(stringSerde, longSerde, "test");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));}
}
And getting the following Exception:
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553)
at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:173)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:155)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:109)
at examplew.ordcount.WordCountProcessorJob.main(WordCountProcessorJob.java:112)
Caused by: java.lang.NoSuchFieldError: AuthFailed
at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:945)
at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:925)
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1230)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)
at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:97)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.<init>(InternalTopicManager.java:93)
at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.configure(StreamPartitionAssignor.java:124)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:239)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:658)
... 6 more
Following jar I have in my path:
jackson-annotations-2.8.5
jackson-core-2.8.5
jackson-databind-2.8.5
kafka-clients-0.10.1.1
kafka-streams-0.10.0.0
zkclient-0.9
zookeeper
slf4j-api-1.7.22
log4j-1.2.17