restarting streaming context

841 views
Skip to first unread message

Arindam Paul

unread,
Aug 12, 2013, 6:50:20 AM8/12/13
to spark...@googlegroups.com
Hi,

We start spark streaming for twitter streams with some keyword and keep listening to a channel for new keywords using redis pubsub. The purpose is to start the twitter streams with new keywords we get through redis pubsub.
When we get new keywords through redis pubsub, we stop Streaming Context using ssc.stop() and then start the new twitter stream with the new keyword and do ssc.start()

till ssc.stop() (in the callback function) it works, but fails when we do ssc.start()
We tried getting new StreamingContext at this point, but as expected, that also did not help.

Here is the pseudo code,
###################
object TwitterStreamsEx {
:::::::
main() {
::::
val stream = ssc.twitterStream(myConsumerKey, myConsumerSecret, myAccessToken, myTokenSecret, filters)
val tweets = stream.map(status => tweetDetails(status))
tweets.foreach(t => {
                do some processing
})
ssc.start()

Sub.sub("UPDATED_KEYWORD")

} // main ends
###################
  object Sub {

  val system = ActorSystem("sub")
  val r_pubsub = new RedisClient("localhost", 6379)
  val s = system.actorOf(Props(new Subscriber(r_pubsub)))
  s ! Register(callback)

  def sub(channels: String*) = {
     s ! Subscribe(channels.toArray)
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
     :::::::::
     case M(channel, msg) =>
        msg match {
        case "TRUE" =>
           println("New Keyword received ..")
           ssc.stop()
           Get the new filters

          val stream = ssc.twitterStream(myConsumerKey, myConsumerSecret, myAccessToken, myTokenSecret, filters)
          val tweets = stream.map(status => tweetDetails(status))
          tweets.foreach(t => {
                do some processing
          })
          ssc.start()
}
###################


Ian O'Connell

unread,
Aug 12, 2013, 10:54:24 AM8/12/13
to spark...@googlegroups.com
What error are you seeing with the ssc.start?

try issuing a System.clearProperty("spark.driver.port") before you call start


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Arindam Paul

unread,
Aug 12, 2013, 11:51:10 PM8/12/13
to spark...@googlegroups.com, i...@ianoconnell.com
Thanks Ian. We tried adding System.clearProperty("spark.driver.port") before ssc.start, but it did not help. We still get an exception at this point.
Well, we have an exception case and the following println (exception.getMessage()), which gets printed during exception.
sending to terminated ref breaks promises

BTW, what "System.clearProperty("spark.driver.port")" is supposed to do ?
Are we doing anything wrong here ?

Arindam Paul

unread,
Aug 13, 2013, 1:21:08 AM8/13/13
to spark...@googlegroups.com, i...@ianoconnell.com
Here is the exception that we see in the log.

13/08/13 10:41:21 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
13/08/13 10:41:21 WARN network.ConnectionManager: All connections not cleaned up
13/08/13 10:41:21 INFO network.ConnectionManager: ConnectionManager stopped
13/08/13 10:41:21 INFO storage.MemoryStore: MemoryStore cleared
13/08/13 10:41:21 INFO storage.BlockManager: BlockManager stopped
13/08/13 10:41:21 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster
13/08/13 10:41:21 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
13/08/13 10:41:21 INFO spray.HttpService: Stopped
13/08/13 10:41:21 INFO spray.SprayCanRootService: spray RootService stopped
13/08/13 10:41:21 INFO storage.MemoryStore: ensureFreeSpace(5808) called with curMem=91991, maxMem=339585269
13/08/13 10:41:21 INFO storage.MemoryStore: Block input-0-1376370681200 stored as bytes to memory (size 5.7 KB, free 323.8 MB)
13/08/13 10:41:21 WARN storage.BlockManager: Putting block input-0-1376370681200 failed
spark.SparkException: Error sending message to BlockManager as driverActor is null [message = spark.storage.UpdateBlockInfo@4f57011e]
        at spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:127)
        at spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:56)
        at spark.storage.BlockManager.spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:247)
        at spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:217)
        at spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:560)
        at spark.storage.BlockManager.put(BlockManager.scala:534)
        at spark.streaming.dstream.NetworkReceiver.pushBlock(NetworkInputDStream.scala:147)
        at spark.streaming.dstream.NetworkReceiver$BlockGenerator.spark$streaming$dstream$NetworkReceiver$BlockGenerator$$keepPushingBlocks(NetworkInputDStream.scala:251)
        at spark.streaming.dstream.NetworkReceiver$BlockGenerator$$anon$1.run(NetworkInputDStream.scala:205)
13/08/13 10:41:21 INFO dstream.NetworkReceiver$BlockGenerator: Data handler stopped
13/08/13 10:41:21 WARN twitter4j.TwitterStreamImpl: Stream already closed.
13/08/13 10:41:21 INFO dstream.TwitterReceiver: Twitter receiver stopped
13/08/13 10:41:21 INFO spark.SparkContext: Successfully stopped SparkContext
13/08/13 10:41:21 INFO streaming.StreamingContext: StreamingContext stopped successfully
13/08/13 10:41:22 INFO streaming.Scheduler: Scheduler stopped
13/08/13 10:41:22 INFO spark.SparkContext: SparkContext already stopped
13/08/13 10:41:22 INFO streaming.StreamingContext: StreamingContext stopped successfully

Arindam Paul

unread,
Aug 19, 2013, 12:19:16 AM8/19/13
to spark...@googlegroups.com, i...@ianoconnell.com, tathagat...@gmail.com
Hi, Can anyone please comment on this one... We tried out different options, but nothing seemed to work.

Thanks.

Tathagata Das

unread,
Aug 21, 2013, 12:05:58 PM8/21/13
to Arindam Paul, spark...@googlegroups.com, Ian O'Connell
Hi Arindam, 

The error I see in the log you posted is kind of expected when you are stopping the spark context. However, as far as I think that should not prevent the system for starting up again. What is the error that you are getting when you are starting up the streaming context again? 

TD


TD

Arindam Paul

unread,
Aug 22, 2013, 6:13:21 AM8/22/13
to spark...@googlegroups.com, Arindam Paul, Ian O'Connell
Hi TD,

In the log file, we do not get any more info. print stacktrace only prints what we see in the log file.

Here is the main part of our code that fails on ssc.start() when isNewKeyAvlbl is true. isNewKeyAvlbl is a global variable and it becomes true when the redis channel we are listening to has some info. As soon as some info become available, isNewKeyAvlbl becomes true and the following ssc.start() fails. We do not know if this is the right way to start the twitter stream with new keywords.

######################
  def processTweet(myConsumerKey: String, myConsumerSecret:String, myAccessToken: String, myTokenSecret: String, filters: Array[java.lang.String], pattern: String, ssc: StreamingContext): Any = {


    val stream = ssc.twitterStream(myConsumerKey, myConsumerSecret, myAccessToken, myTokenSecret, filters)

    val tweets = stream.map(status => tweetDetails(status))
    tweets.foreach(t => {
                t.collect.foreach(e => {
                   println(e.screenName)
                   user10Tweets(e.text, e.screenName, pattern)
                   freqUser(e.screenName, pattern)
                   top10Tweets(e.text, e.retweets.toInt, e.mentions, pattern)
                   userLastTime(e.screenName, e.created_at, pattern)
                   hashTags(e.hash_tag, pattern)
                   urls(e.urls, pattern)
                })

                if (isNewKeyAvlbl) {
                        var filters1= Array("")
                        var pattern1= ""

                        println("received a new keyword")
                        ssc.stop()

                        r.get("LATEST_SEARCH_KEY")  match {
                                case Some(s: String) => keywords = s
                                case None => println("should return mor")
                        }

                        if (keywords != null) {
                                pattern1=keywords.toUpperCase
                                filters1=keywords.replace("_", " ").split(" ")
                        }
                        else {
                                filters1 = Array("bore", "boring", "boredom")
                                pattern1 = "BORE_BORING_BOREDOM"
                        }
                        isNewKeyAvlbl = false
                        processTweet(myConsumerKey, myConsumerSecret, myAccessToken, myTokenSecret, filters1, pattern1, ssc)
                }
    })

   // System.clearProperty("spark.driver.port")
    ssc.start()

  }



Here is what we see in the log file:


13/08/22 15:16:24 INFO streaming.Scheduler: Scheduler stopped
13/08/22 15:16:24 INFO spark.MapOutputTrackerActor: MapOutputTrackerActor stopped!
13/08/22 15:16:24 INFO network.ConnectionManager: Selector thread was interrupted!
13/08/22 15:16:24 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(bangvmplrtda01.sapient.com,60425)
13/08/22 15:16:24 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(bangvmplRTDA01.sapient.com,60425)
13/08/22 15:16:24 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(bangvmplrtda01.sapient.com,48136)
13/08/22 15:16:24 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
13/08/22 15:16:24 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(bangvmplrtda01.sapient.com,48135)
13/08/22 15:16:24 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
13/08/22 15:16:24 WARN network.ConnectionManager: All connections not cleaned up
13/08/22 15:16:24 INFO network.ConnectionManager: ConnectionManager stopped
13/08/22 15:16:24 INFO storage.MemoryStore: MemoryStore cleared
13/08/22 15:16:24 INFO storage.BlockManager: BlockManager stopped
13/08/22 15:16:24 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster
13/08/22 15:16:24 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
13/08/22 15:16:24 INFO spray.HttpService: Stopped
13/08/22 15:16:24 INFO spray.SprayCanRootService: spray RootService stopped
13/08/22 15:16:24 INFO spark.SparkContext: Successfully stopped SparkContext
13/08/22 15:16:24 INFO streaming.StreamingContext: StreamingContext stopped successfully
13/08/22 15:16:24 ERROR streaming.JobManager: Running streaming job 5402 @ 1377164784000 ms failed
akka.pattern.AskTimeoutException: sending to terminated ref breaks promises
        at akka.pattern.AskSupport$class.ask(AskSupport.scala:76)
        at akka.pattern.package$.ask(package.scala:43)
        at akka.pattern.AskSupport$AskableActorRef.$qmark(AskSupport.scala:153)
        at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:505)
        at spark.streaming.NetworkInputTracker.start(NetworkInputTracker.scala:41)
        at spark.streaming.StreamingContext.start(StreamingContext.scala:468)
        at spark.streaming.examples.TwitterDSST$.processTweet(TwitterDSST.scala:164)
        at spark.streaming.examples.TwitterDSST$$anonfun$processTweet$1.apply(TwitterDSST.scala:159)
        at spark.streaming.examples.TwitterDSST$$anonfun$processTweet$1.apply(TwitterDSST.scala:125)
        at spark.streaming.DStream$$anonfun$foreach$1.apply(DStream.scala:460)
        at spark.streaming.DStream$$anonfun$foreach$1.apply(DStream.scala:460)
        at spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:22)
        at spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:21)
        at spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:21)
        at spark.streaming.Job.run(Job.scala:10)
        at spark.streaming.JobManager$JobHandler.run(JobManager.scala:17)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)
13/08/22 15:16:24 INFO streaming.DStreamGraph: Clearing old metadata for time 1377164784000 ms
13/08/22 15:16:24 INFO dstream.ForEachDStream: Cleared 0 RDDs that were older than 1377164782000 ms:
13/08/22 15:16:24 INFO dstream.MappedDStream: Cleared 1 RDDs that were older than 1377164782000 ms: 1377164782000 ms
13/08/22 15:16:24 INFO dstream.TwitterInputDStream: Cleared 1 RDDs that were older than 1377164782000 ms: 1377164782000 ms
Exception in thread "pool-3-thread-1" java.lang.NullPointerException
        at spark.streaming.Time.$minus(Time.scala:25)
        at spark.streaming.DStream$$anonfun$clearOldMetadata$1.apply(DStream.scala:323)
        at spark.streaming.DStream$$anonfun$clearOldMetadata$1.apply(DStream.scala:323)
        at spark.Logging$class.logInfo(Logging.scala:31)
        at spark.streaming.DStream.logInfo(DStream.scala:39)
        at spark.streaming.DStream.clearOldMetadata(DStream.scala:322)
        at spark.streaming.DStreamGraph$$anonfun$clearOldMetadata$2.apply(DStreamGraph.scala:101)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at spark.streaming.DStreamGraph.clearOldMetadata(DStreamGraph.scala:101)
        at spark.streaming.Scheduler.clearOldMetadata(Scheduler.scala:100)
        at spark.streaming.JobManager.spark$streaming$JobManager$$clearJob(JobManager.scala:62)
        at spark.streaming.JobManager$JobHandler.run(JobManager.scala:24)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:679)
13/08/22 15:16:24 INFO storage.MemoryStore: ensureFreeSpace(3717) called with curMem=16907806, maxMem=339585269
13/08/22 15:16:24 INFO storage.MemoryStore: Block input-0-1377164784400 stored as bytes to memory (size 3.6 KB, free 307.7 MB)
13/08/22 15:16:24 WARN storage.BlockManager: Putting block input-0-1377164784400 failed
spark.SparkException: Error sending message to BlockManager as driverActor is null [message = spark.storage.UpdateBlockInfo@38a038e6]
        at spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:127)
        at spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:56)
        at spark.storage.BlockManager.spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:247)
        at spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:217)
        at spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:560)
        at spark.storage.BlockManager.put(BlockManager.scala:534)
        at spark.streaming.dstream.NetworkReceiver.pushBlock(NetworkInputDStream.scala:147)
        at spark.streaming.dstream.NetworkReceiver$BlockGenerator.spark$streaming$dstream$NetworkReceiver$BlockGenerator$$keepPushingBlocks(NetworkInputDStream.scala:251)
        at spark.streaming.dstream.NetworkReceiver$BlockGenerator$$anon$1.run(NetworkInputDStream.scala:205)
13/08/22 15:16:24 INFO dstream.NetworkReceiver$BlockGenerator: Data handler stopped
13/08/22 15:16:24 WARN twitter4j.TwitterStreamImpl: Stream already closed.
13/08/22 15:16:24 INFO dstream.TwitterReceiver: Twitter receiver stopped


Thanks,
Arindam.

Tathagata Das

unread,
Aug 22, 2013, 5:39:56 PM8/22/13
to spark...@googlegroups.com
Hello Arindam,

The way you are starting and stopping the streaming context is really weird. Starting a new streaming context from within the foreach function that is running inside the previous streaming context is not going to work because when you stop the previous streaming context, it terminates the thread running the foreach function. So the new streaming context is not even being started. Rather I propose the following control flow. 

Have a dedicated thread start the streaming context and wait for a signal to close it. Something like this. 

class StreamingRunner extends Thread() {
  override def run() {
    // Clear system property driver.host, etc
    // Start streaming context
    // Wait for some signal that indicates stopping of this context, maybe a global variable or something
    // Stop the streaming context cleanly and reset
  }
}

Then in a different thread completely, monitor the global variable isNewKeyAvlbl.

if (isNewKeyAvlbl) {
  // Signal the current running streaming context to stop
  // Wait for the running thread to finish to ensure that the previous streaming context has been stopped cleanly
  // Start the new StreamingRunner thread with the new keys 
}

As far as I believe this should work. The error you are seeing are not unexpected. Since you are stopping the streaming context at a random point of time in the middle of executing some spark jobs, there may be errors. But those errors should not prevent you from starting another streaming context, if your previous streaming context has been stopped and the system properties have been cleared. We do this restarting all the time in our streaming unit tests. 

TD


Arindam Paul

unread,
Aug 22, 2013, 10:52:53 PM8/22/13
to spark...@googlegroups.com
Hi TD,

I see, it is a synchronization problem, if I may call it so... and your approach is definitely a solution to this. When you say clear system property, I hope you mean spark.driver.host and spark.driver.port and not any other properties.
Again, many thanks for the approach you highlighted.
We are going to try this out.


You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/WqKB5IfSQDo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages