Kafka Streams with JSON

2,623 views
Skip to first unread message

vivek sharma

unread,
Feb 8, 2017, 8:11:17 PM2/8/17
to Confluent Platform
Greetings All - Question - how will I take an Array of several JSONs and filter some of JSONs out of an array of several JSONs based on some fields and their values in JSON(by comparing keys and values in JSON) and to publish filtered JSONs to corresponding topics?

FYI - I am using Kafka Streams 0.10.0.0 with Kafka 0.10.0.0

Matthias J. Sax

unread,
Feb 9, 2017, 1:08:39 AM2/9/17
to confluent...@googlegroups.com
The question is a little open ended...

Did you have a look into KStream#filter() or maybe KStream#mapValues()
or KStream#flatMap()

What did you try so far? Where did you get stock?


-Matthias
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout.

signature.asc

vivek sharma

unread,
Feb 9, 2017, 10:58:29 AM2/9/17
to Confluent Platform
Thanks Matthias, I was exploring which method will server my purpose, so far I experimented with filter and filterNot as following, somehow it is not producing the expected output:

Here I am trying to get all the JSON records in that BusinessProcessName field value is test and need to put those JSONs to testTopic, Need to do the same operation to push the JSONs to other topics on the basis of BusinessProcessName field in JSON, Input will be an array of JSON having different type of BusinessProcessName. FYI - I am also struggling because I don't know how to use syntactically correct methods to cover my requirement, right now I took stringSerde, with
jsonSerde, I got NullPointer not sure why, but assuming due to .10.0 version?

textLines.filter((BusinessProcessName,test)-> BusinessProcessName!=test).to(stringSerde, stringSerde, "testTopic");


On Wednesday, February 8, 2017 at 10:08:39 PM UTC-8, Matthias J. Sax wrote:
The question is a little open ended...

Did you have a look into KStream#filter() or maybe KStream#mapValues()
or KStream#flatMap()

What did you try so far? Where did you get stock?


-Matthias

On 2/8/17 5:11 PM, vivek sharma wrote:
> Greetings All - Question - how will I take an Array of several JSONs and
> filter some of JSONs out of an array of several JSONs based on some
> fields and their values in JSON(by comparing keys and values in JSON)
> and to publish filtered JSONs to corresponding topics?
>
> FYI - I am using Kafka Streams 0.10.0.0 with Kafka 0.10.0.0
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

vivek sharma

unread,
Feb 10, 2017, 12:35:35 PM2/10/17
to Confluent Platform
Another question on this: I am assuming that I will need to have a model class as well for my input JSON? If yes How Model will get the values from input JSON, is this process built-in or I need to set the Model fields manually in code?

Guozhang Wang

unread,
Feb 10, 2017, 3:15:14 PM2/10/17
to Confluent Platform

vivek sharma

unread,
Feb 10, 2017, 6:47:03 PM2/10/17
to Confluent Platform
Thanks Guozhang for your quick help on this, somehow I am getting the below error after using your approach to serialize and deserialize, I wonder if you ever face this one and have some idea:

Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "BusinessProcessName" (class BusinessProcess), not marked as ignorable (one known property: "businessProcessName"])
 at [Source: [B@61d38298; line: 1, column: 27] (through reference chain: BusinessProcess["BusinessProcessName"])
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "BusinessProcessName" (class BusinessProcess), not marked as ignorable (one known property: "businessProcessName"])
 at [Source: [B@61d38298; line: 1, column: 27] (through reference chain: BusinessProcess["BusinessProcessName"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:51)
        at com.fasterxml.jackson.databind.DeserializationContext.reportUnknownProperty(DeserializationContext.java:839)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1045)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1352)

vivek sharma

unread,
Feb 10, 2017, 7:43:53 PM2/10/17
to Confluent Platform

I used the below annotation above my class however not sure why it is searching BusinessProcessName, I have property name called businessProcessName
@JsonIgnoreProperties(value = "BusinessProcessName")
to get rid of previous error but got stocked now in the below error:

Exception in thread "StreamThread-1" java.lang.ClassCastException: KafkaStreamsSpike$BusinessProcess cannot be cast to [B
        at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
        at org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:77)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
        at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
        at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68)
        at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

vivek sharma

unread,
Feb 14, 2017, 2:50:45 PM2/14/17
to Confluent Platform
Thank you guys for all your help Finally I got rid of all issues but want to know how I will handle the Array of JSON in my code, I changed the deserializer code like below:

TypeFactory typeFactory = objectMapper.getTypeFactory();
CollectionType collectionType = typeFactory.constructCollectionType(
Collection.class, tClass);


data = objectMapper.readValue(bytes, collectionType);

But I am not sure how to handle Collection with branch method, Does someone has any thought or suggestion how to deal with this situation? It really blocked me !

vivek sharma

unread,
Feb 14, 2017, 7:40:31 PM2/14/17
to Confluent Platform
Hi Guys: Does anyone face the below error? Code works fine with 2 elements but throws error if there are 3 in a JSON Array:


Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input in VALUE_STRING
 at [Source: [B@69a7af80; line: 1, column: 8191]
 at [Source: [B@69a7af80; line: 1, column: 4066] (through reference chain: java.util.ArrayList[2]->BusinessProcess["iKey"])
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Unexpected end-of-input in VALUE_STRING
 at [Source: [B@69a7af80; line: 1, column: 8191]
 at [Source: [B@69a7af80; line: 1, column: 4066] (through reference chain: java.util.ArrayList[2]->BusinessProcess["iKey"])
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:388)
        at com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:348)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.wrapAndThrow(BeanDeserializerBase.java:1599)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:278)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:140)
        at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:287)
        at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:259)
        at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:26)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2959)
        at JsonPOJODeserializer.deserialize(JsonPOJODeserializer.java:70)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:42)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:78)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:138)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:304)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input in VALUE_STRING
 at [Source: [B@69a7af80; line: 1, column: 8191]
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:483)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportInvalidEOF(ParserMinimalBase.java:460)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._loadMoreGuaranteed(UTF8StreamJsonParser.java:2404)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2489)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishAndReturnString(UTF8StreamJsonParser.java:2469)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:315)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:31)
        at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:11)
        at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:499)
        at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:108)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:276)
        ... 13 more

Matthias J. Sax

unread,
Feb 15, 2017, 7:29:39 PM2/15/17
to confluent...@googlegroups.com
Not sure.

Did you search the internet for the exception? It's not a Streams
exception, but related to JSON.

>> Caused by: com.fasterxml.jackson.databind.JsonMappingException:
>> Unexpected end-of-input in VALUE_STRING

-Matthias
> it is searching *BusinessProcessName, I have property name
> called b**usinessProcessName*
> *
> *
> confluent-platf...@googlegroups.com
> >
> <mailto:confluent-platf...@googlegroups.com>.
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

vivek sharma

unread,
Feb 17, 2017, 4:00:14 PM2/17/17
to Confluent Platform

 Yes, I was searching this one through internet and really got stocked in this one.. Funny thing is this error is not every time but coming intermittently, When I use one or 2 JSON messages in JSON array, it is working fine.. When I give 3 JSON messages in JSON array I get the below errors, I raised this issue with Jackson as well, not sure what is causing this..  Do you know any other reliable library for serialization/deserialization purpose (I don't mean that Jackson is not reliable)? JSON issues already killed my lots of time, still could not find what causing these errors.. If same code can handle 2 JSON messages in Array why it can not handle 3 . :(

com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between Array entries
 at [Source: [B@43fd9452; line: 1, column: 5313]

com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.util.ArrayList out of START_OBJECT token
>                             confluent-platform+unsub...@googlegroups.com
>                             >
>                             <mailto:confluent-platform+unsub...@googlegroups.com>.
>
>                             > To post to this group, send email to
>                             confluent...@googlegroups.com
>                             > <mailto:confluent...@googlegroups.com>.
>                             > To view this discussion on the web visit
>                             >
>                             https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com
>                             <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com>
>
>                             >
>                             <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer
>                             <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>                             > For more options, visit
>                             https://groups.google.com/d/optout
>                             <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Matthias J. Sax

unread,
Feb 18, 2017, 10:08:17 PM2/18/17
to confluent...@googlegroups.com
Not sure.

Kafka also uses Jackson internally:


https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java

https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java

-Matthias
> confluent-platf...@googlegroups.com <javascript:>
> > >
> >
> <mailto:confluent-platf...@googlegroups.com <javascript:>>.
> >
> > > To post to this group, send email to
> > confluent...@googlegroups.com
> > > <mailto:confluent...@googlegroups.com>.
> > > To view this discussion on the web
> visit
> > >
> >
> https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com>
>
> >
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com>>
>
> >
> > >
> >
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer>
>
> >
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
> >
> > > For more options, visit
> > https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>
> > <https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>>.
> >
> > --
> > You received this message because you are subscribed to the Google
> > Groups "Confluent Platform" group.
> > To unsubscribe from this group and stop receiving emails from it,
> send
> > an email to confluent-platf...@googlegroups.com
> <javascript:>
> > <mailto:confluent-platf...@googlegroups.com
> <javascript:>>.
> > To post to this group, send email to confluent...@googlegroups.com
> <javascript:>
> > <mailto:confluent...@googlegroups.com <javascript:>>.
> <https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
> > For more options, visit https://groups.google.com/d/optout
> <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/4edc7fb2-f6e6-4f7c-b8fb-a2bfb5a249b4%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/4edc7fb2-f6e6-4f7c-b8fb-a2bfb5a249b4%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc

vivek sharma

unread,
Feb 19, 2017, 9:29:40 PM2/19/17
to Confluent Platform
Thanks Matthias for your passion to help me out..

As I was researching on this issue, I moved to gson from Jackson in the hope to get rid of this issue, I did not get rid of this issue however I got to know that the Kafka Streams is not able to consume the whole JSON array when I am sending multiple JSON.. Single or 2JSON it is taking but more that 2 it is only taking part of Array and that's why It is throwing Malformed Exception with gson that makes sense to me because it does not recognize the JSON Array. on a side note my brokers having good data.

Now the Big question is why Kafka Streams accepting only part of JSON array having 3 JSON elements. Is it having some characters limit or do I need to make it sleep for some time, I really got stocked here and not finding any help from internet either, not sure how to go ahead from here.. any small idea/suggestions/help will be very much appreciated !
>     confluent-platform+unsub...@googlegroups.com <javascript:>
>     >                             >
>     >                            
>     <mailto:confluent-platform+unsub...@googlegroups.com <javascript:>>.
>     >
>     >                             > To post to this group, send email to
>     >                             confluent...@googlegroups.com
>     >                             > <mailto:confluent...@googlegroups.com>.
>     >                             > To view this discussion on the web
>     visit
>     >                             >
>     >                            
>     https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com>
>
>     >                            
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com>>
>
>     >
>     >                             >
>     >                            
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer>
>
>     >                            
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/e969b562-f227-45ac-ad19-434becc018d0%40googlegroups.com?utm_medium=email&utm_source=footer>>>.
>
>     >
>     >                             > For more options, visit
>     >                             https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>
>     >                             <https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>>.
>     >
>     > --
>     > You received this message because you are subscribed to the Google
>     > Groups "Confluent Platform" group.
>     > To unsubscribe from this group and stop receiving emails from it,
>     send
>     > an email to confluent-platform+unsub...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent-platform+unsub...@googlegroups.com
>     <javascript:>>.
>     > To post to this group, send email to confluent...@googlegroups.com
>     <javascript:>
>     > <mailto:confluent...@googlegroups.com <javascript:>>.
>     > To view this discussion on the web visit
>     >
>     https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com
>     <https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com>
>
>     >
>     <https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com?utm_medium=email&utm_source=footer
>     <https://groups.google.com/d/msgid/confluent-platform/057ca3bb-be24-4296-b182-14c8c4608758%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>     > For more options, visit https://groups.google.com/d/optout
>     <https://groups.google.com/d/optout>.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send

Guozhang Wang

unread,
Feb 19, 2017, 10:59:59 PM2/19/17
to Confluent Platform
That is because JSonSerde used in streams-example is exactly each message to be a single JSON message only, as its type is

 You can always provide your own Serde to deserialize a byte array into a LIST[JSonNode] instead of a single JSonNode.


Guozhang

Nurcan Sonmez

unread,
Nov 9, 2017, 5:06:25 PM11/9/17
to Confluent Platform
Vivek, it's been a while since you posted this question but just writing my solution in case other people will also check. 

What I did was, I used flatMapValues API to add every message in the array to the stream then did the filtering and other stuff. I used JsonPOJOSerde and JsonNode.

Here is a snipped from the code.

//the value is in the array format here
final KStream<String, JsonNode> mstream = builder.stream(stringSerde, jsonSerde, config.getMainTopic());

//this is how I create a single message for every element in the stream
final KStream<String, JsonNode> msstream= mstream .flatMapValues(new CustomValueMapper());

and this is the CustomValueMapper

public class CustomValueMapper implements ValueMapper<JsonNode, Iterable<JsonNode>>{
@Override
public Iterable<JsonNode> apply(JsonNode node) {  
 
Iterable<JsonNode> listOfNodes = new ArrayList<JsonNode>();
if(node.isArray()) {
for(JsonNode singleNode : node) {
((List<JsonNode>) listOfNodes).add(singleNode);
}
} else{
((List<JsonNode>) listOfNodes).add(node);
}
return listOfNodes;
}
}

I hope it'll help.

Sachin Kademane

unread,
Dec 5, 2017, 1:06:28 AM12/5/17
to Confluent Platform
Can you please help me with the complete code to understand this.

muthuk...@skava.com

unread,
Jan 5, 2018, 9:26:31 AM1/5/18
to Confluent Platform
Normal Word Count program i got this error so please help me :


[2018-01-05 14:15:00,272] ERROR stream-thread [StreamThread-1] Streams application error during processing:  (org.apache.kafka.streams.processor.internals.StreamThread)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[2018-01-05 14:15:00,332] WARN Unexpected state transition from RUNNING to NOT_RUNNING (org.apache.kafka.streams.processor.internals.StreamThread)
Exception in thread "StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
[2018-01-05 14:15:00,334] WARN Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)



in My source code :

public class KafkaEventStream
{

    public static void main(final String[] args) throws Exception
    {
        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> textLines = builder.stream("streams-plaintext-input");
        KTable<String, Long> wordCounts = textLines.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))).groupBy((key,
                                                                                                                                            word) -> word).count("Counts");
        wordCounts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

}

Matthias J. Sax

unread,
Jan 5, 2018, 4:28:06 PM1/5/18
to confluent...@googlegroups.com
Not sure why it fails.

The code seems correct -- you set StringSerde for key and value in the
config... (to so you need to do a `Serdes.String().getClass().getName()` ?)

Does KafkaStreams pick up this configs correctly? You can find the
configs that are use in the logs.

You could try to set the serdes in `builder.stream` explicitly -- but
this should not be necessary.


-Matthias
> ------------------------------------------------------------------------
>
> This email message, including attachments, may contain confidential
> and/or privileged information and/or material.  If you receive this
> message in error, please notify the sender and delete it from your
> mailbox and system immediately.  Any review, retransmission,
> dissemination or other use of, or taking any action in reliance upon,
> this message, by anyone other than the intended recipient, is not
> authorized.  The sender disclaims all liability from unauthorized use. 
> Skava is a private company incorporated in California as Kallidus, Inc. 
> This communication may come from Skava or one of its affiliates.
>
> --
> You received this message because you are subscribed to the Google
> Groups "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to confluent-platf...@googlegroups.com
> <mailto:confluent-platf...@googlegroups.com>.
> To post to this group, send email to confluent...@googlegroups.com
> <mailto:confluent...@googlegroups.com>.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/f80226cc-8758-4f94-8498-c83afbe4905c%40googlegroups.com
> <https://groups.google.com/d/msgid/confluent-platform/f80226cc-8758-4f94-8498-c83afbe4905c%40googlegroups.com?utm_medium=email&utm_source=footer>.
signature.asc
Reply all
Reply to author
Forward
0 new messages