Reading from Cassandra using Spark Streaming

764 views
Skip to first unread message

耀

unread,
Sep 8, 2015, 3:12:47 AM9/8/15
to DataStax Spark Connector for Apache Cassandra
Hello All,

I have a problem when i use spark streaming to read from Cassandra.

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

As the link above, i use

val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

to select the data from cassandra, but it seems that the spark streaming has just one query once but i want it continues to query using an interval 10 senconds.

My code is as follow, wish for your response.

Thanks!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

val ssc = new StreamingContext(conf, Seconds(10))

val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

//rdd.collect().foreach(println)

val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


val dstream = ssc.queueStream(rddQueue)

dstream.print()

ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}

Russell Spitzer

unread,
Sep 8, 2015, 12:09:36 PM9/8/15
to DataStax Spark Connector for Apache Cassandra
Only operations on Dstreams are repeated every batch and Queue Stream is really only for testing and requires you to place new RDD's in the queue every time from your own code. So your code would only work (and don't do this) if you had a thread that repeatedly placed new CassandraTable Rdd's into the queue.

Most folks who use streaming use either CassandraConnector.withSessionDo or joinWithCassandraTable to use the elements of their Dstream to get specific records from C*.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

耀

unread,
Sep 9, 2015, 3:44:05 AM9/9/15
to DataStax Spark Connector for Apache Cassandra
在 2015年9月9日星期三 UTC+8上午12:09:36,Russell Spitzer写道:
In fact, I want to read the table on each interval (such as 10s) to query some records which are related to the time but I am puzzled about how to create a DStream from Cassandra to Spark. In Spark-Cassandra-Connector, the doc says,
Create any of the available or custom Spark streams. The connector supports Akka Actor streams so far, but will be supporting many more in the next release. You can extend the provided
import com.datastax.spark.connector.streaming.TypedStreamingActor

val stream = ssc.actorStream[String](Props[TypedStreamingActor[String]], "stream", StorageLevel.MEMORY_AND_DISK)

would you like to give me more details about how to create stream and get data from cassandra every 10s.

Thanks very much!

Gerard Maas

unread,
Sep 9, 2015, 8:54:36 AM9/9/15
to DataStax Spark Connector for Apache Cassandra
You can create a ConstantInputDStream with the CassandraRDD as input. ConstantInputDStream will provide the same RDD on each streaming interval, and by executing an action on that RDD you will trigger a materialization of the RDD lineage, leading to executing the query on Cassandra every time.

Make sure that the data being queried does not grow unbounded to avoid increasing query times and resulting in an unstable streaming process.

Something like this should do the trick (using your code as starting point):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf, Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

val dstream = new ConstantInputDStream(ssc, cassandraRDD)

dstream.foreachRDD{ rdd => println(rdd.collect.mkString("\n")) // any action will trigger the underlying cassandra query, using collect to have a simple output
}
ssc.start()

Gerard Maas

unread,
Sep 9, 2015, 8:57:45 AM9/9/15
to DataStax Spark Connector for Apache Cassandra
BTW, I noticed that this question was also posted to SO (http://stackoverflow.com/questions/32451614/reading-from-cassandra-using-spark-streaming/32480168#32480168)

I've provided the answer there as well for the benefit of future visitor.

耀

unread,
Sep 9, 2015, 11:48:05 PM9/9/15
to DataStax Spark Connector for Apache Cassandra
在 2015年9月9日星期三 UTC+8下午8:57:45,Gerard Maas写道:
> BTW, I noticed that this question was also posted to SO (http://stackoverflow.com/questions/32451614/reading-from-cassandra-using-spark-streaming/32480168#32480168)
>
> I've provided the answer there as well for the benefit of future visitor.

It's very helpful for me to solve the problem, the question posted on StackOverflow was aksed by me several days ago. There were few tips about how to read from cassandra into streaming previously. Your answer is quite great for me.
Thanks again.
Reply all
Reply to author
Forward
0 new messages