Hi All,
I have following use-case :
Use spark streaming to read json data from kafka . Batch the data for 10 mins interval , do some aggregations using spark sql. At this time I have a dataframe .
I need to load this dataframe into druid. Can you please help me with how can I load this Dataframe into druid ?
I read about Tranquility . But I think it can only load Dstreams RDD into druid (and not dataframes which are created from processing dstreams ) .
Sample Code :
var sparkSession = StreamingContextBuilder.sparkSessionBuilder(appName)
val ssc = StreamingContextBuilder.getStreamingContext(sparkSession,aggregationTime)
val kafkaParams =StreamingContextBuilder.setKafkaBroker(StreamingCfg.kafkaBroker)
val messages = StreamingContextBuilder.getKafkaMessages(ssc,kafkaTopics,kafkaParams)
val jsonStr = messages
.map(_._2)
.foreachRDD((rdd, time: Time) => {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import org.apache.spark.sql.functions._
val jsonSchema = scala.io.Source.fromFile(schemaFile).mkString
val schema = DataType.fromJson(jsonSchema).asInstanceOf[StructType]
val df = spark.read.schema(schema).json(rdd)
df.withColumn("date_key", TimeUtil.getDateTimeWindow(col(timestampField),lit((aggregationTime/60)),lit(timeMeasure)))
.filter(col("date_key") >= time.toString())
.cache()
.createOrReplaceTempView(viewName)
val aggQuery = sqlQuery
spark.sql("set spark.sql.caseSensitive=true")
val tmp = spark
.sql(aggQuery)
})
ssc
In the above example, I need to store the dataframe "tmp" into a druid datasource. Basically every 10 mins I will have new data into "tmp" which I need to load into druid.
I would appreciate any help .
Thank you,
Vinay