How to parse Json formatted Kafka message in spark streaming

8,823 views
Skip to first unread message

Cui Lin

unread,
Mar 3, 2015, 7:06:48 PM3/3/15
to spark-conn...@lists.datastax.com
Dear all,

I'm trying to parse json formatted Kafka messages and then send back to cassandra.I have two problems:
1. how to parse the json message from streams. I was trying to use JsonRDD but not working...

2. how to get all column names from json messages?

I don't know why my code is not working:
val streams = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, storageLevel)
streams.foreachRDD( rdd => {
for(item <- rdd.collect().toArray) {
System.out.println(item);

}



--
Best regards!

Lin,Cui

Helena Edelson

unread,
Mar 3, 2015, 9:18:22 PM3/3/15
to spark-conn...@lists.datastax.com
Hi Lin,

I am putting together a sample for you.
One note, your code is probably not working because you have not yet started the streaming context.

streamingContext.start()

and you need to declare your streaming sink (output) prior to starting the context.

- Helena
@helenaedelson

Helena Edelson

unread,
Mar 3, 2015, 10:57:45 PM3/3/15
to spark-conn...@lists.datastax.com
Hi Lin,

Here is one way to do it, i.e. not the only way
https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson.scala

You could dig into the json structure more with both spark sql and / or json4s (for example). This is just a super simple snippet.

I could have sworn there was a streaming json function but now I can't find it. Maybe someone else has a more elegant sample :)

Helena
@helenaedelson

On Tuesday, March 3, 2015 at 7:06:48 PM UTC-5, Cui Lin wrote:

Cui Lin

unread,
Mar 4, 2015, 1:58:58 AM3/4/15
to spark-conn...@lists.datastax.com
Hi, Helena,

Your example is very helpful to me. I will test this tomorrow. 
Do you know any API for retrieving column names from JsonRDD? I have hundreds of columns and seems not feasible to hardcode each in sql. 

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--
Best regards!

Lin,Cui

Cui Lin

unread,
Mar 4, 2015, 2:24:21 AM3/4/15
to spark-conn...@lists.datastax.com
Hi, Helena,

I'm not sure the best way to handle the empty collection situation when running 
kafkaStream.foreachRDD { rdd =>
sqlContext.jsonRDD(rdd).registerTempTable("mytable")
I got the following exception right after submitting the application. Thanks!



Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:869)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:869)
at org.apache.spark.sql.json.JsonRDD$.inferSchema(JsonRDD.scala:57)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:232)
at org.apache.spark.sql.SQLContext.jsonRDD(SQLContext.scala:204)

On Tue, Mar 3, 2015 at 7:57 PM, Helena Edelson <helena....@datastax.com> wrote:
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--
Best regards!

Lin,Cui

Cui Lin

unread,
Mar 4, 2015, 7:49:13 PM3/4/15
to spark-conn...@lists.datastax.com
Helena,

beside the empty collection issue, I am checking with you if cassandraTable is for sc or ssc ? as your program in http://helenaedelson.com/?tag=apache-spark use ssc instead.

sc.cassandraTable("githubstats", "monthly_commits")

also could you tell me which package to import for awaitCond? I tried import scala.concurrent.duration._  but not working...
awaitCond(table.collect.size > 1, 5.seconds)


Thanks so much for your help!!!


--
Best regards!

Lin,Cui

Helena Edelson

unread,
Mar 4, 2015, 9:44:49 PM3/4/15
to spark-conn...@lists.datastax.com
Hi Lin,

That I do not know, other than what for you would be a very tedious row.getString(0), row.getInt(1) etc.. from the spark sql api or parsing with json4s or something.
I’d ask on the spark user list to see if anyone has a solution for your use case :)

Helena
tw:@helenaedelson

 

Hi, Helena,

Your example is very helpful to me. I will test this tomorrow. 
Do you know any API for retrieving column names from JsonRDD? I have hundreds of columns and seems not feasible to hardcode each in sql. 

Helena Edelson

unread,
Mar 4, 2015, 9:51:37 PM3/4/15
to spark-conn...@lists.datastax.com
Hi Lin,
You can use sc.cassandraTable and ssc.cassandraTable, you need only add the import: here are the docs describing this:

streaming:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

core:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md

The 'awaitCond' function is in the assertions trait of the spark-cassandra-connector-embedded artifact.
Note this is used for testing, demos and quick prototyping so it does include an embedded kafka/zookeeper, just fyi for dependency awareness ;)

I will take a look at your error.

Helena
https://twitter.com/helenaedelson

Helena Edelson

unread,
Mar 5, 2015, 10:24:00 AM3/5/15
to spark-conn...@lists.datastax.com
Hi Cui,

So I posted the updated sample for you here: https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/KafkaStreamingJson2.scala

Helena

On Tuesday, March 3, 2015 at 7:06:48 PM UTC-5, Cui Lin wrote:

Cui Lin

unread,
Mar 5, 2015, 5:52:29 PM3/5/15
to spark-conn...@lists.datastax.com
Helena,

Could you let me know the link for defining MonthlyCommits? can I put any any column name into extract[column_name]?

.map{ case (_,v) => JsonParser.parse(v).extract[MonthlyCommits]}

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--
Best regards!

Lin,Cui

Helena Edelson

unread,
Mar 5, 2015, 7:22:11 PM3/5/15
to spark-conn...@lists.datastax.com
It's just a simple case class with an added apply to create it in line in the map function
https://github.com/killrweather/killrweather/blob/master/killrweather-examples/src/main/scala/com/datastax/killrweather/SampleJson.scala#L61

The way it works is the column names map to the case class fields.

Helena
https://twitter.com/helenaedelson

Helena Edelson

unread,
Mar 5, 2015, 7:23:22 PM3/5/15
to spark-conn...@lists.datastax.com
Actually I don't even use the apply, it's just the straight case class field names to column name implicit mapping.

Cui Lin

unread,
Mar 5, 2015, 8:07:28 PM3/5/15
to spark-conn...@lists.datastax.com
Helena,

In your example, the json object contains only 4 fields and its filed names are known.
In my case, jsonRDD could contain hundreds of fields and its field names can't be defined in the code. Do you know how to deal with this case?

Helena Edelson

unread,
Mar 7, 2015, 11:00:34 AM3/7/15
to spark-conn...@lists.datastax.com
Hi,

For reads, you could use this to create your rows if you know the column name -> value in runtime:
https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala#L101

But for writes, this kind of thing however does require defining that this function specifically is for a stream of data type x:
stream.map{ case (_,v) => JsonParser.parse(v).extract[MonthlyCommits]}

Read http://json4s.org
Helena

Keesjan de Vries

unread,
Apr 20, 2016, 11:10:48 AM4/20/16
to DataStax Spark Connector for Apache Cassandra, helena....@datastax.com
Hi Helena,


I really wanted to do this sort of thing:

stream.map{ case (_,v) => JsonParser.parse(v).extract[MonthlyCommits]}

But I ran into this problem:
http://stackoverflow.com/questions/24786377/notserializableexception-with-json4s-on-spark

The DefaultFormats seem to be non-serializable in the json4s-3.2.10 that seems to be used in Spark (I get incompatibility when I compile with 3.3.0).

You would really help me with advise on this.

Best,
Kees Jan

Jaroslaw Grabowski

unread,
Apr 20, 2016, 11:35:42 AM4/20/16
to spark-conn...@lists.datastax.com, helena....@datastax.com
Hi,

try this https://github.com/json4s/json4s/issues/137 and see if it helps.


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.



--

JAROSLAW GRABOWSKI
Software Engineer





Message has been deleted

Swati Saini

unread,
Mar 23, 2017, 3:42:29 AM3/23/17
to DataStax Spark Connector for Apache Cassandra, icecre...@gmail.com

Hi All,

I am running this following code I'm getting problem in converting Rdd into dataframe because Rdd taking schema of first row for converting it into dataframe.


import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import json
from kafka import KafkaConsumer, KafkaClient

conf = pyspark.SparkConf()

conf.setMaster('mesos://local_host:5050')

sc = pyspark.SparkContext(conf=conf)

sqlContext = SQLContext(sc)

ssc = StreamingContext(sc, 2)
kafkaParams = {'metadata.broker.list': 'local_host:9092', 'auto.offset.reset': 'smallest'}

topics = ['new3']
kafka_stream = KafkaUtils.createDirectStream(ssc,topics,kafkaParams)

parsed = kafka_stream.map(lambda (k, v): json.loads(v))
parsed.foreachRDD(lambda rdd : rdd.toDF())
ssc.start()

Reply all
Reply to author
Forward
0 new messages