Joining Kafka and Cassandra DataFrames in Spark Streaming ignores C* predicate pushdown

396 views
Skip to first unread message

Bernhard Pfund

unread,
Feb 10, 2016, 1:22:56 AM2/10/16
to DataStax Spark Connector for Apache Cassandra
All,

I'm new to Spark and I'm having a hard time doing a simple join of two DFs

Intent:
- I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages (Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table.

Problem:
- Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown). This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever...

As user zero323 on SO pointed out it's problematic to use dynamic data as predicate, but then I can't see how to efficiently perform a one-to-many join from C* w/o loading the world into memory.

SO: http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p

Could anyone shed some light on this? In my perception this should be a prime-example for DFs and Spark Streaming.

Environment:
- Spark 1.6
- Cassandra 2.1.12
- Cassandra-Spark-Connector 1.5-RC1
- Kafka 0.8.2.2

Code:

/**
* Lazily instantiated singleton instance of base_data DataFrame
*/
object base_data_df {

@transient private var instance: DataFrame = _

def getInstance(sqlContext: SQLContext): DataFrame = {
if (instance == null) {
// Load DataFrame with C* data-source
instance = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "cf", "keyspace" -> "ks"))
.load()
}
instance
}
}

def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("test")
.set("spark.cassandra.connection.host", "xxx")
.set("spark.cassandra.connection.keep_alive_ms", "30000")
.setMaster("local[*]")

val ssc = new StreamingContext(conf, Seconds(10))
ssc.sparkContext.setLogLevel("INFO")

// Initialise Kafka
val kafkaTopics = Set[String]("xxx")
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000",
"auto.offset.reset" -> "smallest")

// Kafka stream
val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics)

// Executed on the driver
messages.foreachRDD { rdd =>

// Create an instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._

// Map MyMsg RDD
val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)}

// Convert RDD[MyMsg] to DataFrame
val MyMsgDf = MyMsgRdd.toDF()
.select(
$"prim1Id" as 'prim1_id,
$"prim2Id" as 'prim2_id,
$...
)

// Load DataFrame from C* data-source
val base_data = base_data_df.getInstance(sqlContext)

// Inner join on prim1Id and prim2Id
val joinedDf = MyMsgDf.join(base_data,
MyMsgDf("prim1_id") === base_data("prim1_id") &&
MyMsgDf("prim2_id") === base_data("prim2_id"), "left")
.filter(base_data("prim1_id").isin(MyMsgDf("prim1_id"))
&& base_data("prim2_id").isin(MyMsgDf("prim2_id")))

joinedDf.show()
joinedDf.printSchema()

// Select relevant fields

// Persist

}

// Start the computation
ssc.start()
ssc.awaitTermination()
}

Qi Li

unread,
Feb 10, 2016, 2:51:22 AM2/10/16
to DataStax Spark Connector for Apache Cassandra

just a guess. will it be better if you do collect MyMsgDF before join? and use the collection instead of DF in isin predicate


--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Message has been deleted

Bernhard Pfund

unread,
Feb 10, 2016, 3:28:56 AM2/10/16
to DataStax Spark Connector for Apache Cassandra


I actually tried that but couldn't get it to work (which also might stem from my limited spark/scala skills)

Russell Spitzer

unread,
Feb 16, 2016, 1:59:38 PM2/16/16
to DataStax Spark Connector for Apache Cassandra
This wouldn't be a case of predicate pushdown. This is a join on a partition key column. Currently only joinWithCassandraTable supports this direct kind of join although we are working on some methods to try to have this automatically done within Spark. 

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Bernhard Pfund

unread,
Feb 16, 2016, 2:11:39 PM2/16/16
to spark-conn...@lists.datastax.com

Hi Russell

Thanks for the clarification. I went down the joinWithCassandraTable path and got stuck at the point where I tried to bring the joinedRDD contents back into a DataFrame (see my other recent post)

Antonio Ye

unread,
May 1, 2017, 11:55:03 PM5/1/17
to DataStax Spark Connector for Apache Cassandra
Is this still the case? Is there still no way to have a dataframe join push down the filter when joining on the partition key(s)?

Russell Spitzer

unread,
May 2, 2017, 12:36:20 AM5/2/17
to DataStax Spark Connector for Apache Cassandra
This is still the case, only be converting back to RDD can you do joinWithCassandraTable

Antonio Ye

unread,
May 2, 2017, 8:04:23 AM5/2/17
to spark-conn...@lists.datastax.com
Thanks Russell. So that means I am out of luck if want to do this from pyspark as I understand it that the Cassandra RDD API is not available through python, correct?

Russell Spitzer

unread,
May 2, 2017, 10:39:33 AM5/2/17
to spark-conn...@lists.datastax.com
Yes there is no longer anyone maintaining a Pyspark version of the RDD interface.

Russell Spitzer
Software Engineer




DS_Sig2.png

Reply all
Reply to author
Forward
0 new messages