t.collect.foreach(e => {
println(e.screenName)
user10Tweets(e.text, e.screenName, pattern)
freqUser(e.screenName, pattern)
top10Tweets(e.text, e.retweets.toInt, e.mentions, pattern)
userLastTime(e.screenName, e.created_at, pattern)
hashTags(e.hash_tag, pattern)
urls(e.urls, pattern)
})
if (isNewKeyAvlbl) {
var filters1= Array("")
var pattern1= ""
println("received a new keyword")
ssc.stop()
r.get("LATEST_SEARCH_KEY") match {
case Some(s: String) => keywords = s
case None => println("should return mor")
}
if (keywords != null) {
pattern1=keywords.toUpperCase
filters1=keywords.replace("_", " ").split(" ")
}
else {
filters1 = Array("bore", "boring", "boredom")
pattern1 = "BORE_BORING_BOREDOM"
}
isNewKeyAvlbl = false
processTweet(myConsumerKey, myConsumerSecret, myAccessToken, myTokenSecret, filters1, pattern1, ssc)
}
})
// System.clearProperty("spark.driver.port")
ssc.start()
}
13/08/22 15:16:24 INFO streaming.Scheduler: Scheduler stopped
13/08/22 15:16:24 INFO spark.MapOutputTrackerActor: MapOutputTrackerActor stopped!
13/08/22 15:16:24 INFO network.ConnectionManager: Selector thread was interrupted!
13/08/22 15:16:24 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(
bangvmplrtda01.sapient.com,60425)
13/08/22 15:16:24 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(
bangvmplRTDA01.sapient.com,60425)
13/08/22 15:16:24 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(
bangvmplrtda01.sapient.com,48136)
13/08/22 15:16:24 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
13/08/22 15:16:24 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(
bangvmplrtda01.sapient.com,48135)
13/08/22 15:16:24 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found
13/08/22 15:16:24 WARN network.ConnectionManager: All connections not cleaned up
13/08/22 15:16:24 INFO network.ConnectionManager: ConnectionManager stopped
13/08/22 15:16:24 INFO storage.MemoryStore: MemoryStore cleared
13/08/22 15:16:24 INFO storage.BlockManager: BlockManager stopped
13/08/22 15:16:24 INFO storage.BlockManagerMasterActor: Stopping BlockManagerMaster
13/08/22 15:16:24 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
13/08/22 15:16:24 INFO spray.HttpService: Stopped
13/08/22 15:16:24 INFO spray.SprayCanRootService: spray RootService stopped
13/08/22 15:16:24 INFO spark.SparkContext: Successfully stopped SparkContext
13/08/22 15:16:24 INFO streaming.StreamingContext: StreamingContext stopped successfully
13/08/22 15:16:24 ERROR streaming.JobManager: Running streaming job 5402 @ 1377164784000 ms failed
akka.pattern.AskTimeoutException: sending to terminated ref breaks promises
at akka.pattern.AskSupport$class.ask(AskSupport.scala:76)
at akka.pattern.package$.ask(package.scala:43)
at akka.pattern.AskSupport$AskableActorRef.$qmark(AskSupport.scala:153)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:505)
at spark.streaming.NetworkInputTracker.start(NetworkInputTracker.scala:41)
at spark.streaming.StreamingContext.start(StreamingContext.scala:468)
at spark.streaming.examples.TwitterDSST$.processTweet(TwitterDSST.scala:164)
at spark.streaming.examples.TwitterDSST$$anonfun$processTweet$1.apply(TwitterDSST.scala:159)
at spark.streaming.examples.TwitterDSST$$anonfun$processTweet$1.apply(TwitterDSST.scala:125)
at spark.streaming.DStream$$anonfun$foreach$1.apply(DStream.scala:460)
at spark.streaming.DStream$$anonfun$foreach$1.apply(DStream.scala:460)
at spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:22)
at spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:21)
at spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:21)
at spark.streaming.Job.run(Job.scala:10)
at spark.streaming.JobManager$JobHandler.run(JobManager.scala:17)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
13/08/22 15:16:24 INFO streaming.DStreamGraph: Clearing old metadata for time 1377164784000 ms
13/08/22 15:16:24 INFO dstream.ForEachDStream: Cleared 0 RDDs that were older than 1377164782000 ms:
13/08/22 15:16:24 INFO dstream.MappedDStream: Cleared 1 RDDs that were older than 1377164782000 ms: 1377164782000 ms
13/08/22 15:16:24 INFO dstream.TwitterInputDStream: Cleared 1 RDDs that were older than 1377164782000 ms: 1377164782000 ms
Exception in thread "pool-3-thread-1" java.lang.NullPointerException
at spark.streaming.Time.$minus(Time.scala:25)
at spark.streaming.DStream$$anonfun$clearOldMetadata$1.apply(DStream.scala:323)
at spark.streaming.DStream$$anonfun$clearOldMetadata$1.apply(DStream.scala:323)
at spark.Logging$class.logInfo(Logging.scala:31)
at spark.streaming.DStream.logInfo(DStream.scala:39)
at spark.streaming.DStream.clearOldMetadata(DStream.scala:322)
at spark.streaming.DStreamGraph$$anonfun$clearOldMetadata$2.apply(DStreamGraph.scala:101)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at spark.streaming.DStreamGraph.clearOldMetadata(DStreamGraph.scala:101)
at spark.streaming.Scheduler.clearOldMetadata(Scheduler.scala:100)
at spark.streaming.JobManager.spark$streaming$JobManager$$clearJob(JobManager.scala:62)
at spark.streaming.JobManager$JobHandler.run(JobManager.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)
13/08/22 15:16:24 INFO storage.MemoryStore: ensureFreeSpace(3717) called with curMem=16907806, maxMem=339585269
13/08/22 15:16:24 INFO storage.MemoryStore: Block input-0-1377164784400 stored as bytes to memory (size 3.6 KB, free 307.7 MB)
13/08/22 15:16:24 WARN storage.BlockManager: Putting block input-0-1377164784400 failed
spark.SparkException: Error sending message to BlockManager as driverActor is null [message = spark.storage.UpdateBlockInfo@38a038e6]