kafka streams word count

971 views
Skip to first unread message

Skrzypek, Jonathan

unread,
Feb 23, 2018, 1:03:43 PM2/23/18
to confluent...@googlegroups.com

Hi,

 

I’ve been trying the kafka streams word count example and have been unable to get any data in the output topic.

Topics are configured as explained, and there is data continuously flowing in the input topic.

 

When the stream app runs it just doesn’t send any data to the output topic (debug shows there is no ProduceRequest going out).

I’m using 0.10.2.1

 

I’ve tried a couple samples, from

https://docs.confluent.io/current/streams/quickstart.html

https://github.com/apache/kafka/blob/0.10.2/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java

https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java

 

The app start properly, no errors in the logs.

 

Interestingly, when modifying the sample to do a very simple builder.stream("IN_TOPIC").to(“OUT_TOPIC”) to ship records without transformation, it works fine and I can see messages flowing in the output topic.

 

Any ideas ?

Matthias J. Sax

unread,
Feb 23, 2018, 6:30:52 PM2/23/18
to confluent...@googlegroups.com
How long did you wait?

I guess it's related to KTable caching:
https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html

Either you wait for more than 30 seconds, or decrease commit interval to
a lower value, of disable caching (either globally by setting cache size
to 0, or locally via passing in a `Materialized` parameter into the count().


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/2d8b4f90889c4469bc9e5b7e7c9d208d%40gsdgeup05etn1.firmwide.corp.gs.com
> <https://groups.google.com/d/msgid/confluent-platform/2d8b4f90889c4469bc9e5b7e7c9d208d%40gsdgeup05etn1.firmwide.corp.gs.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

Skrzypek, Jonathan

unread,
Feb 26, 2018, 6:29:38 AM2/26/18
to confluent...@googlegroups.com
I waited a fair bit, but following your suggestion I set CACHE_MAX_BYTES_BUFFERING_CONFIG to 0.
I still don't see data in the output topic.

Interestingly, I don't think it even creates the internal topics as a streams reset gives me ;

Resetting offsets to zero for input topics [IN_TOPIC] and all internal topics.
[2018-02-26 05:51:33,520] WARN Error while fetching metadata with correlation id 2 : {streams-wordcount-Counts-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2018-02-26 05:51:33,628] WARN Error while fetching metadata with correlation id 7 : {streams-wordcount-Counts-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2018-02-26 05:51:33,629] WARN The following subscribed topics are not assigned to any members in the group streams-wordcount : [streams-wordcount-Counts-repartition] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-02-26 05:51:33,730] WARN Error while fetching metadata with correlation id 10 : {streams-wordcount-Counts-repartition=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
Done.
Deleting all internal/auto-created topics for application streams-wordcount
Topic streams-wordcount-Counts-repartition is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
Done.

I've debugged further and noticed the flatMapValues operator is never fed records from the KStream.

KStream<String, String> source = builder.stream("IN_TOPIC");
KTable<String, Long> wordCounts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count("Counts");

Debugger breakpoint never hits the lambda value -> Arrays.asList()
But it does if I remove groupBy and count from the above, or if I do something like ;

KStream<String, String> source = builder.stream("IN_TOPIC");
source.mapValues(v -> v.replace("a","z"));

Tried using 0.11.0.2 but got the same behaviour.
I don't get what I'm missing.

Jonathan Skrzypek
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/4499139c-2104-e26f-3aa4-e867a617563d%40confluent.io.

Matthias J. Sax

unread,
Feb 26, 2018, 12:25:45 PM2/26/18
to confluent...@googlegroups.com
Not sure...

The input topic name is correct? And the topic contains data? Reset
policy is "earliest"? Did you try to change the app.id (should not be
necessary as you reset the app -- just to make sure).

What does the logs show? Maybe increase log level to DEBUG.

-Matthias
signature.asc

Skrzypek, Jonathan

unread,
Feb 27, 2018, 5:41:57 AM2/27/18
to confluent...@googlegroups.com
Thanks for the suggestions.
The topic name and data are correct, as builder.stream("IN_TOPIC").to(“OUT_TOPIC”) works properly.
Which also rules out authorization issues I think.

Offset reset policy is earliest.
Tried to change the app id without success.

Inspected the logs and it never mentions OUT_TOPIC, I don't think it even requests metadata for it.
It however does when doing the simple builder.stream("IN_TOPIC").to(“OUT_TOPIC”).
When doing a simple pass-through stream I see the streams topology being defined, sensors being added etc

2018-02-27 10:23:04.214 [StreamThread-1] DEBUG StreamPartitionAssignor:606 - stream-thread [StreamThread-1] Starting to validate internal topics in partition assignor.
2018-02-27 10:23:04.214 [StreamThread-1] INFO StreamPartitionAssignor:640 - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
2018-02-27 10:23:04.217 [StreamThread-1] DEBUG StreamPartitionAssignor:386 - stream-thread [StreamThread-1] Created repartition topics [] from the parsed topology.
2018-02-27 10:23:04.218 [StreamThread-1] DEBUG StreamPartitionAssignor:606 - stream-thread [StreamThread-1] Starting to validate internal topics in partition assignor.
2018-02-27 10:23:04.218 [StreamThread-1] INFO StreamPartitionAssignor:640 - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
2018-02-27 10:23:04.219 [StreamThread-1] DEBUG StreamPartitionAssignor:461 - stream-thread [StreamThread-1] Created state changelog topics {} from the parsed topology.

2018-02-27 10:23:05.287 [kafka-producer-network-thread | streams-wordcount2-1855c093-5482-4874-81c0-8daedb62684b-StreamThread-1-producer] DEBUG Metrics:335 - Added sensor with name topic.OUT_TOPIC.records-per-batch
2018-02-27 10:23:05.288 [kafka-producer-network-thread | streams-wordcount2-1855c093-5482-4874-81c0-8daedb62684b-StreamThread-1-producer] DEBUG Metrics:335 - Added sensor with name topic.OUT_TOPIC.bytes
2018-02-27 10:23:05.288 [kafka-producer-network-thread | streams-wordcount2-1855c093-5482-4874-81c0-8daedb62684b-StreamThread-1-producer] DEBUG Metrics:335 - Added sensor with name topic.OUT_TOPIC.compression-rate
2018-02-27 10:23:05.288 [kafka-producer-network-thread | streams-wordcount2-1855c093-5482-4874-81c0-8daedb62684b-StreamThread-1-producer] DEBUG Metrics:335 - Added sensor with name topic.OUT_TOPIC.record-retries
2018-02-27 10:23:05.288 [kafka-producer-network-thread | streams-wordcount2-1855c093-5482-4874-81c0-8daedb62684b-StreamThread-1-producer] DEBUG Metrics:335 - Added sensor with name topic.OUT_TOPIC.record-errors

Somehow, the fact that there is a flatMapValues and a groupBy declared in the stream makes the app not do any of the above at all, it doesn't talk to the output topic.

I am running https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java line for line except topic names.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/25cdee0c-86c2-6ba7-d457-52bc60f1f759%40confluent.io.

prateek nigam

unread,
Apr 20, 2020, 4:01:22 PM4/20/20
to Confluent Platform
are you able to find what is the problem, i am also not able to see any message in output topic. 
    But no exception on console. 

Lisa Duchesne

unread,
Oct 23, 2020, 3:32:46 PM10/23/20
to Confluent Platform
Has anyone discovered the cause for this error?  I also am having this error in a kafka streams application while trying to leftjoin to topics then putting the stream to an output topic.

Matthias J. Sax

unread,
Oct 23, 2020, 6:42:45 PM10/23/20
to confluent...@googlegroups.com
One more thought:

Does your data have not-null key and value? Records with `null` key or
value are dropped by a join.

-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/4521492b-0dbc-428d-a0cf-e1400d94ee2bn%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/4521492b-0dbc-428d-a0cf-e1400d94ee2bn%40googlegroups.com?utm_medium=email&utm_source=footer>.

Lisa Duchesne

unread,
Oct 23, 2020, 8:31:48 PM10/23/20
to Confluent Platform
The brokers will have both null and not-null keys.  Initially the key was not set so older messages will have null keys, but the changes have been made now to set the key so the new messages which are the ones we care about going forward will have a key.

I have tried KStream-KStream join and also KStream-GlobalKTable join - no luck so far.

Matthias J. Sax

unread,
Oct 24, 2020, 1:21:18 PM10/24/20
to confluent...@googlegroups.com
What Serdes format do you use?

Note, that joins operate on byte[] comparison. If the two inputs use a
different byte representation (for example different schema ID for Avro
records in combination with Schema Registry), the join might not work.

-Matthias
> <https://groups.google.com/d/msgid/confluent-platform/4521492b-0dbc-428d-a0cf-e1400d94ee2bn%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/4521492b-0dbc-428d-a0cf-e1400d94ee2bn%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/d928190e-e888-403b-9dc2-50b495d61deen%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/d928190e-e888-403b-9dc2-50b495d61deen%40googlegroups.com?utm_medium=email&utm_source=footer>.

Lisa Duchesne

unread,
Oct 26, 2020, 9:59:07 AM10/26/20
to Confluent Platform
I am using Serdes.String() for both streams.  I actually have 8 topics to join all Serdes.String().  Currently I am not using Avro to stream the topics.  I am filtering the main topic and outputting that filtered stream to an output topic, then trying to join the output topic with another topic just to get it to work at the moment.  I have tried a Kstream -> Kstream join and also have tried building a Ktable with the output topic and then joining the Ktable with another.  None of these have worked.

What do you suggest?   The data within the topics actually is JSON, but each topic is a different JSON structure.  Meanwhile I will look into AVRO.

Thank you.

Matthias J. Sax

unread,
Oct 26, 2020, 1:11:18 PM10/26/20
to confluent...@googlegroups.com
Well, if the data is Json and you `String` serde, maybe it's a white
space issue? As the serialized bytes are compared, having different
white spaces would result in different bytes and thus the join may fail.

Btw: I am not saying that AVRO will make it better... If you input data
is not AVRO and you don't have any reason to work with AVRO, just stay
with JSON.

Maybe using a proper JSON Serde fixed the issue.

-Matthias
> <https://groups.google.com/d/msgid/confluent-platform/d928190e-e888-403b-9dc2-50b495d61deen%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/d928190e-e888-403b-9dc2-50b495d61deen%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/22087e66-ba9b-4039-9b93-b2c69ac5d7a3n%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/22087e66-ba9b-4039-9b93-b2c69ac5d7a3n%40googlegroups.com?utm_medium=email&utm_source=footer>.
Reply all
Reply to author
Forward
0 new messages