Example of group by value

1,717 views
Skip to first unread message

sku...@lucirix.com

unread,
Apr 5, 2013, 10:43:53 AM4/5/13
to spark...@googlegroups.com
I am new to spark and was wondering how to go about doing a group by value

i.e. assume my RDD has key, value pairs of word counts {a, 2}, {b, 2}, {c, 1} and {d, 4}

I want to give me 1 -> (c), 2 -> (a, b) and 4 -> (d), i.e. transform the RDD I have to another RDD which goes the other way

I believe this falls under the collating map-reduce paradigm, but I need something concrete (code example) to achieve what I want.

I would appreciate some help and in return will submit an example for others to consume.

cheers
Kumar

Reynold Xin

unread,
Apr 5, 2013, 10:50:24 AM4/5/13
to spark...@googlegroups.com
val rdd = sc.parallelize(Seq(("a", 2), ("b", 2), ("c", 1), ("D", 4))
rdd.map(_.swap).groupByKey().collect().foreach(println)
(1,ArrayBuffer(c))
(2,ArrayBuffer(a, b))
(4,ArrayBuffer(D))

--
Reynold Xin, AMPLab, UC Berkeley



--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

sku...@lucirix.com

unread,
Apr 5, 2013, 3:27:34 PM4/5/13
to spark...@googlegroups.com
Thanks!

Here's a complete example with spark streaming. It might be helpful to others

--

object NetworkWordCountAsKey {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: NetworkWordCountUpdateStateByKey <master> <hostname> <port>\n" +
        "In local mode, <master> should be 'local[n]' with n > 1")
      System.exit(1)
    }

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      println("currentCount: " + currentCount)

      val previousCount = state.getOrElse(0)
      println("previousCount: " + previousCount)

      val cumulative = Some(currentCount + previousCount)
      println("Cumulative: " + cumulative)

      cumulative
    }

    val keyByFunc = (tuple: (String, Int)) => {
      tuple._2
    }

    // Create the context with a 10 second batch size
    val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(10),
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
    ssc.checkpoint(".")

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc') 
    val lines = ssc.socketTextStream(args(1), args(2).toInt)
    val words = lines.flatMap(_.split(" "))
    words.map(x => (x, 1)).updateStateByKey[Int](updateFunc).foreach(m => {
      m.groupBy[Int](keyByFunc).foreach(t => {
        println("t: " + t)
      })
    })

    ssc.start()

Mansoor Rahim

unread,
Apr 24, 2013, 6:22:38 AM4/24/13
to spark...@googlegroups.com
Hi sku..
i am new to spark, could you explain how to compile and execute this code

Thanks
Mansoor

sku...@lucirix.com

unread,
May 3, 2013, 1:18:37 AM5/3/13
to spark...@googlegroups.com
Sorry for the late reply (I did not see this post)

Simply do (from the command line) sbt package (from the top spark folder)

This will build the examples as well (put the file in the examples folder)

And then invoke it the usual way: ./run local[2] ...
Reply all
Reply to author
Forward
0 new messages