I have a problem when i use spark streaming to read from Cassandra.
As the link above, i use
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
to select the data from cassandra, but it seems that the spark streaming has just one query once but i want it continues to query using an interval 10 senconds.
My code is as follow, wish for your response.
Thanks!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Make sure that the data being queried does not grow unbounded to avoid increasing query times and resulting in an unstable streaming process.
Something like this should do the trick (using your code as starting point):
import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd => println(rdd.collect.mkString("\n")) // any action will trigger the underlying cassandra query, using collect to have a simple output
}
ssc.start()
I've provided the answer there as well for the benefit of future visitor.