Here's a complete example with spark streaming. It might be helpful to others
--
object NetworkWordCountAsKey {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCountUpdateStateByKey <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
println("currentCount: " + currentCount)
val previousCount = state.getOrElse(0)
println("previousCount: " + previousCount)
val cumulative = Some(currentCount + previousCount)
println("Cumulative: " + cumulative)
cumulative
}
val keyByFunc = (tuple: (String, Int)) => {
tuple._2
}
// Create the context with a 10 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(10),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
ssc.checkpoint(".")
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
words.map(x => (x, 1)).updateStateByKey[Int](updateFunc).foreach(m => {
m.groupBy[Int](keyByFunc).foreach(t => {
println("t: " + t)
})
})
ssc.start()