I'm new to Spark and I'm having a hard time doing a simple join of two DFs
Intent:
- I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages (Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table.
Problem:
- Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown). This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever...
As user zero323 on SO pointed out it's problematic to use dynamic data as predicate, but then I can't see how to efficiently perform a one-to-many join from C* w/o loading the world into memory.
Could anyone shed some light on this? In my perception this should be a prime-example for DFs and Spark Streaming.
Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2
Code:
/**
* Lazily instantiated singleton instance of base_data DataFrame
*/
object base_data_df {
@transient private var instance: DataFrame = _
def getInstance(sqlContext: SQLContext): DataFrame = {
if (instance == null) {
// Load DataFrame with C* data-source
instance = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cf", "keyspace" -> "ks"))
.load()
}
instance
}
}
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")
// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")
// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)
// Executed on the driver
messages.foreachRDD { rdd =>
// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}
// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)
// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)
// Inner join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))
joinedDf.show()
joinedDf.printSchema()
// Select relevant fields
// Persist
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
just a guess. will it be better if you do collect MyMsgDF before join? and use the collection instead of DF in isin predicate
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I actually tried that but couldn't get it to work (which also might stem from my limited spark/scala skills)
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Hi Russell
Thanks for the clarification. I went down the joinWithCassandraTable path and got stuck at the point where I tried to bring the joinedRDD contents back into a DataFrame (see my other recent post)