Loading spark sql.dataframe into druid

448 views
Skip to first unread message

Vinay Shetty

unread,
Aug 13, 2018, 5:40:36 PM8/13/18
to Druid User
Hi All,

I have following use-case :

Use spark streaming to read json data from kafka . Batch the data for 10 mins interval , do some aggregations using spark sql. At this time I have a dataframe .

I need to load this dataframe into druid. Can you please help me with how can I load this Dataframe into druid ? 

I read about Tranquility . But I think it can only load Dstreams RDD into druid (and not dataframes which are created from processing dstreams ) .


Sample Code : 

var sparkSession = StreamingContextBuilder.sparkSessionBuilder(appName)

val ssc = StreamingContextBuilder.getStreamingContext(sparkSession,aggregationTime)


val kafkaParams =StreamingContextBuilder.setKafkaBroker(StreamingCfg.kafkaBroker)

val messages = StreamingContextBuilder.getKafkaMessages(ssc,kafkaTopics,kafkaParams)

val jsonStr = messages
.map(_._2)
.foreachRDD((rdd, time: Time) => {

val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import org.apache.spark.sql.functions._

val jsonSchema = scala.io.Source.fromFile(schemaFile).mkString
val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
val df = spark.read.schema(schema).json(rdd)
df.withColumn("date_key", TimeUtil.getDateTimeWindow(col(timestampField),lit((aggregationTime/60)),lit(timeMeasure)))
.filter(col("date_key") >= time.toString())
.cache()
.createOrReplaceTempView(viewName)
val aggQuery = sqlQuery
spark.sql("set spark.sql.caseSensitive=true")
val tmp = spark
.sql(aggQuery)


})



ssc





In the above example, I need to store the dataframe "tmp" into a druid datasource. Basically every 10 mins I will have new data into "tmp" which I need to load into druid.

I would appreciate any help .

Thank you,
Vinay

Vinay Shetty

unread,
Aug 14, 2018, 1:06:48 PM8/14/18
to Druid User
BUMP

Druid_novice

unread,
Aug 16, 2018, 11:01:18 PM8/16/18
to Druid User
Got it .  Convert  your df into rdd of some case class and then we can use propagate function of tranquility .
Reply all
Reply to author
Forward
0 new messages