Spark streaming: simulate input data and plot graph of output.

1,782 views
Skip to first unread message

Aswin Roy

unread,
Jun 17, 2013, 4:10:31 AM6/17/13
to spark...@googlegroups.com
I want to simulate real-time data and process it using Spark-streaming (e.g. word count) and obtain the output in a real-time graph. The graph can be built using HTML, PHP preferably. I am able to input data through netcat server and obtain the output through spark-streaming (word count). Now, the challenge is to simulate real-time data and then plot a graph for the operation. Can someone guide me on how to do it? I am new to front-end tools like PHP.

Thanks.

Prashant Sharma

unread,
Jun 18, 2013, 5:05:51 AM6/18/13
to spark...@googlegroups.com
Hi Aswin, 

What you are trying to do is interesting and non-trivial also a lot of people would like to do. There are or may be many ways to go about it. I have already tried it in swings and not on web. 
So to go about it, you can start looking at websockets and figure out a way to pull from it. If you would like to play with some scala code and latest technologies a starter code is ready made and is available opensource. To get it working try installing typesafe activator http://typesafe.com/activator (I am not trying to endorse it, but only to help !). 

When you try it out, one of the examples is reactive streams. Which is what you would like to make your starting point. Unfortunately I cannot help someone on PHP. But there should be an equivalent to it.

Thanks


On Mon, Jun 17, 2013 at 1:40 PM, Aswin Roy <aswinj...@gmail.com> wrote:
I want to simulate real-time data and process it using Spark-streaming (e.g. word count) and obtain the output in a real-time graph. The graph can be built using HTML, PHP preferably. I am able to input data through netcat server and obtain the output through spark-streaming (word count). Now, the challenge is to simulate real-time data and then plot a graph for the operation. Can someone guide me on how to do it? I am new to front-end tools like PHP.

Thanks.

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
s

Ian O'Connell

unread,
Jun 18, 2013, 12:07:23 PM6/18/13
to spark...@googlegroups.com
While you could do something with streaming the data from websockets its pretty rare a 10sec latency or so would be an issue. If so just a simple timer + ajax request to update the graph is generally an easier option. And integrates with plenty of javascript graphing solutions pretty easily (e.g. highcharts, free for OS or personal iirc). 

Someone can correct me but the easiest way I would think to implement this is going to be:
1) Spark streaming backend to perform transforms and aggregations
2) Atomic increments/updates issued to a redis which is providing the persistance layer too
3) Play/Rails/PHP/Lift/Spring/etc... REST interface to query the redis.
4) Static html + js (served by (3) maybe) or just on a CDN or elsewhere. That queries the rest interface every N seconds for the updated count for the last data point.
5) Interpolate or increase polling frequency to make graph look fancier....

(3) could be integrated into the same process as (1) but for any real world application thats probably not a good idea for anything beyond internal metrics as to how your tasks are performing I wouldn't think.



Aswin Roy

unread,
Jun 19, 2013, 5:13:44 AM6/19/13
to spark...@googlegroups.com
Thanks for all the reply. They are definitely helpful. But, here is one way I find it easy to do and I need help to do it as well.

The examples folder in the spark-streaming folder has many easy examples. One of them "ActorWordCount" creates a random message and counts the words, both by itself. Here is the code: 

package spark.streaming.examples

import scala.collection.mutable.LinkedList
import scala.util.Random

import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala

import spark.streaming.Seconds
import spark.streaming.StreamingContext
import spark.streaming.StreamingContext.toPairDStreamFunctions
import spark.streaming.receivers.Receiver
import spark.util.AkkaUtils

case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)

/**
 * Sends the random content to every receiver subscribed with 1/2
 *  second delay.
 */
class FeederActor extends Actor {

  val rand = new Random()
  var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()

  val strings: Array[String] = Array("words ", "may ", "count ")

  def makeMessage(): String = {
    val x = rand.nextInt(3)
    strings(x) + strings(2 - x)
  }

  /*
   * A thread to generate random messages
   */
  new Thread() {
    override def run() {
      while (true) {
        Thread.sleep(500)
        receivers.foreach(_ ! makeMessage)
      }
    }
  }.start()

  def receive: Receive = {

    case SubscribeReceiver(receiverActor: ActorRef) =>
      println("received subscribe from %s".format(receiverActor.toString))
    receivers = LinkedList(receiverActor) ++ receivers

    case UnsubscribeReceiver(receiverActor: ActorRef) =>
      println("received unsubscribe from %s".format(receiverActor.toString))
    receivers = receivers.dropWhile(x => x eq receiverActor)

  }
}

/**
 * A sample actor as receiver, is also simplest. This receiver actor
 * goes and subscribe to a typical publisher/feeder actor and receives
 * data.
 *
 * @see [[spark.streaming.examples.FeederActor]]
 */
class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
extends Actor with Receiver {

  lazy private val remotePublisher = context.actorFor(urlOfPublisher)

  override def preStart = remotePublisher ! SubscribeReceiver(context.self)

  def receive = {
    case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
  }

  override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)

}

/**
 * A sample feeder actor
 *
 * Usage: FeederActor <hostname> <port>
 *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
 */
object FeederActor {

  def main(args: Array[String]) {
    if(args.length < 2){
      System.err.println(
        "Usage: FeederActor <hostname> <port>\n"
      )
      System.exit(1)
    }
    val Seq(host, port) = args.toSeq


    val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
    val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")

    println("Feeder started as:" + feeder)

    actorSystem.awaitTermination();
  }
}

/**
 * A sample word count program demonstrating the use of plugging in
 * Actor as Receiver
 * Usage: ActorWordCount <master> <hostname> <port>
 *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
 *   <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
 *
 * To run this example locally, you may run Feeder Actor as
 *    `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
 * and then run the example
 *    `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
 */
object ActorWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(
        "Usage: ActorWordCount <master> <hostname> <port>" +
        "In local mode, <master> should be 'local[n]' with n > 1")
      System.exit(1)
    }

    val Seq(master, host, port) = args.toSeq

    // Create the context and set the batch size
    val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
      System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

    /*
     * Following is the use of actorStream to plug in custom actor as receiver
     *
     * An important point to note:
     * Since Actor may exist outside the spark framework, It is thus user's responsibility
     * to ensure the type safety, i.e type of data received and InputDstream
     * should be same.
     *
     * For example: Both actorStream and SampleActorReceiver are parameterized
     * to same type to ensure type safety.
     */

    val lines = ssc.actorStream[String](
      Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
        host, port.toInt))), "SampleReceiver")

    //compute wordcount
    lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()

    ssc.start()
  }
}

Is it possible I can use this? 

Here is the link of code (AJAX, JS) I am planning to use to generate the graph: http://jsxgraph.uni-bayreuth.de/wiki/index.php/Real-time_graphing

Is there a way I can link the output of the above scala program to this code and generate the dynamic graph? The JS code uses the Ajax.Request function to fetch the input from the PHP file (shown in the above link). Making my question more clear, is there a way I can substitute the scala code in place of the PHP code?  

Thanks. 

Prashant Sharma

unread,
Jun 19, 2013, 5:47:32 AM6/19/13
to spark...@googlegroups.com
It is definitely possible! But the above example is for receiver end. 


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
s

Aswin Roy

unread,
Jun 19, 2013, 7:40:55 AM6/19/13
to spark...@googlegroups.com
Hi prashant,

Are you talking about the scala program? Can you make clear what the issue is?  

Thanks. 
Aswin Jose Roy
Computer Science and Engineering (2010-2014)
Govt. Model Engineering College
Cochin-21

Sam Bessalah

unread,
Jun 19, 2013, 7:53:16 AM6/19/13
to spark...@googlegroups.com
There are plenty of ways to do that. Like mentionned above, is to use plain Js/node or Play with the redis pub sub, or using a LinkedBlockingQueue to push the data in Rest fashion within an actor. But the linked queue might blow up due to to the throughput of spark streaming...
If you're interested here's a state of the art presentation uding storm/kafka/redis/d3.js/node for realtime viualisation. I hope it helps.
http://www.youtube.com/watch?v=DRgs8L4AOgc&feature=share&list=UUNBXEH4QKTLnrEUTO8TYdmg

Sam Bessalah

unread,
Jun 19, 2013, 7:59:40 AM6/19/13
to spark...@googlegroups.com

Aswin Roy

unread,
Jun 19, 2013, 8:00:03 AM6/19/13
to spark...@googlegroups.com
Hi sam,

Thanks for your reply. But, actually, I am a research intern and is learning to work on Spark. I can't use Storm at the moment, but I'm sure it'll be helpful in the future. Right now, I need help on linking the output of the spark-streaming program to the JS/AJAX code to produce the dynamic graph. 

Regards. 


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


Sam Bessalah

unread,
Jun 19, 2013, 8:57:51 AM6/19/13
to spark...@googlegroups.com
Actually the idea remains the same, you can use Spatk streaming to write your computed result into redis, and on the other side the front end code will act exactly in the same way. In this use case Storm and Spark are interchangeable. I'll write down more later.
But your approach with an actor is also good, I was just reffering another solutions.

Aswin Roy

unread,
Jun 19, 2013, 3:54:45 PM6/19/13
to spark...@googlegroups.com
Hi Sam,

I understand that it is a great option and I'm definitely trying it out.. But, I was thinking of using an Actor  right now. Can you advice me on how to write the computed result in this case (ActorWordCount) and feed it as input to the front end? Can JSON formats be used? Or is there a better way?

Thanks and regards. 


On Wed, Jun 19, 2013 at 6:27 PM, Sam Bessalah <samk...@gmail.com> wrote:
Actually the idea remains the same, you can use Spatk streaming to write your computed result into redis, and on the other side the front end code will act exactly in the same way. In this use case Storm and Spark are interchangeable. I'll write down more later.
But your approach with an actor is also good, I was just reffering another solutions.
--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


Aswin Roy

unread,
Jun 24, 2013, 2:12:48 AM6/24/13
to spark...@googlegroups.com
Hi all,

I am trying to edit the AcrorWordCount.scala program in spark-streaming examples. My aim is to count the occurences of a character in the Actor-generated string and write the output to a text file. I have tried the following code :

val lines = ssc.actorStream[String](
  Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format(
    host, port.toInt))), "SampleReceiver")


var mything = lines.count('r'==)
var pw = new java.io.PrintWriter("simp.txt")
pw.println(List(mything).mkString("\t"))
pw.close
It shows the follwing error when i compile using "sbt/sbt compile":
 too many arguments for method count: ()spark.streaming.DStream[Long] [error] var mything = lines.count('r'==)

Please help me figure out what the problem is? 

Thanks.

On Monday, June 17, 2013 1:40:31 PM UTC+5:30, Aswin Roy wrote:

Sam Bessalah

unread,
Jun 24, 2013, 3:20:33 AM6/24/13
to spark...@googlegroups.com
The count method doesn't take any argument. You should at the Scaladoc for spark streaming.
Reply all
Reply to author
Forward
0 new messages