Thanks. I hope I'm not posting too much code. I slimmed it down as much as I could. All of the properties are just at the top. In short, I'm taking some data off a kafka stream, manipulating it a bit, and attempting to print out the results. If I remove some sections, and just print 'salesWithOomph' for example, everything works fine. I'm lost at a meta-level. How can I diagnose an example like shown above (I couldn't find any other hints in the logs)? I'd rather know what my debugging strategy should be, rather than the a solution to my particular problem.
object GeoSalePerformance extends App {
System.setProperty("spark.local.dir", "tmp")
val ssc = new StreamingContext("local", "GeoSalePerformance", Seconds(10), "/web/spark")
val geoIpClient = new LocationServiceImpl("
http://localhost:7001/geoip_service/v2")
val events = ssc.kafkaStream("localhost:2181/kafka", "GeoSalePerformance15", Map("svcclickstreamrefinement.ga.productlistingpageview" -> 1))
val totalsMap = collection.mutable.Map[String, AtomicLong]()
// e2 => (sale id-> Map(msa -> 1)
val listingPageViews = events.map(e => (CommonsJson.parse[ProductListingPageViewed](e))).flatMap( e =>
e.getBasicRequestInfo.getIp.flatMap { ip =>
val location = geoIpClient.getLocation(ip)
val postalCode = Option(location).flatMap(l => Option(l.getPostalCode()))
val msaOpt = postalCode.flatMap(z => Option(geoIpClient.getMsa(z)))
val msaName = msaOpt.flatMap(m => Option(m.getName))
msaName.map(n => (e.getSaleId, Map[String, Long](n -> 1l)))
})
// e3 => (sale id -> Map(msa -> count)
val salesWithMsaFrequency = listingPageViews.reduceByKey { (v1, v2) =>
v1.foldLeft(v2)((acc, entry) =>
acc.updated(entry._1, entry._2 + acc.get(entry._1).getOrElse(0l))
)
}
// totalsRrd => Map(msa -> count)
val totalsRrd = listingPageViews.map(pair => pair._2).reduce { (mapOne, mapTwo) =>
mapOne.foldLeft(mapTwo) { (acc, element) =>
acc.updated(element._1, element._2 + acc.get(element._1).getOrElse(0l))
}
}.persist(StorageLevel.MEMORY_AND_DISK)
// Update the map with totals count.