Spark Streaming: block not found error

756 views
Skip to first unread message

James Englert

unread,
Jul 22, 2013, 8:48:47 AM7/22/13
to spark...@googlegroups.com
Hi,

I've been experimenting with Spark Streaming for a bit.  I keep running into errors like this running in local mode:

java.lang.Exception: Could not compute split, block input-0-1374496930200 not found
    at spark.rdd.BlockRDD.compute(BlockRDD.scala:27)
    at spark.RDD.computeOrReadCheckpoint(RDD.scala:232)
    at spark.RDD.iterator(RDD.scala:221)
    at spark.rdd.MappedRDD.compute(MappedRDD.scala:12)
    at spark.RDD.computeOrReadCheckpoint(RDD.scala:232)
    at spark.RDD.iterator(RDD.scala:221)
    at spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:15)

Followed by:

2013-07-22 08:46:13,296 [pool-6-thread-1] INFO  spark.scheduler.DAGScheduler - Failed to run foreach at GeoSalePerformance.scala:61
2013-07-22 08:46:13,296 [pool-6-thread-1] ERROR spark.streaming.JobManager - Running streaming job 19 @ 1374496940000 ms failed
spark.SparkException: Job failed: Task 42.0:1 failed more than 4 times; aborting job java.lang.Exception: Could not compute split, block input-0-1374496930200 not found
    at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:671)
    at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:669)

GeoSalePerformance.scala, from like 60, looks like:

 totalsRrd.foreach { rrd =>
    rrd.foreach { map =>
      map.foreach { case (k, v) =>
        totalsMap.get(k).getOrElse({
            val al = new AtomicLong(0)
            totalsMap.put(k, al)
            al
          }).addAndGet(v)
        }
      }
    output.append("totalsmap: " + totalsMap)
  }


I'm a bit lost on how I would go about debugging issues like this.  I've scoured the entire log file and I don't see any information indicating what might have happened to block 'input-0-1374496930200'.  What steps would you recommend to debug issues like this?

Thanks,

--
Jim Englert
Gilt Groupe
2 Park Ave South, 5th Floor
New York, NY 10011
M: 847-707-2942
Please accept my invitation to join Gilt:
http://www.giltgroupe.com/invite/jenglert

Tathagata Das

unread,
Jul 22, 2013, 12:49:56 PM7/22/13
to spark...@googlegroups.com

Can you elaborate on how are you receiving the data stream and what operations are you doing on the data stream? Also what configurations (Java properties, etc) are you using? 

TD


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

James Englert

unread,
Jul 22, 2013, 10:34:57 PM7/22/13
to spark...@googlegroups.com
Thanks.  I hope I'm not posting too much code. I slimmed it down as much as I could.  All of the properties are just at the top.  In short, I'm taking some data off a kafka stream, manipulating it a bit, and attempting to print out the results.  If I remove some sections, and just print 'salesWithOomph' for example, everything works fine.  I'm lost at a meta-level.  How can I diagnose an example like shown above (I couldn't find any other hints in the logs)?  I'd rather know what my debugging strategy should be, rather than the a solution to my particular problem. 

object GeoSalePerformance extends App {

  System.setProperty("spark.local.dir", "tmp")

  val ssc = new StreamingContext("local", "GeoSalePerformance", Seconds(10), "/web/spark")

  val geoIpClient = new LocationServiceImpl("http://localhost:7001/geoip_service/v2")

  val events = ssc.kafkaStream("localhost:2181/kafka", "GeoSalePerformance15", Map("svcclickstreamrefinement.ga.productlistingpageview" -> 1))

  val totalsMap = collection.mutable.Map[String, AtomicLong]()

  // e2 => (sale id-> Map(msa -> 1)
  val listingPageViews = events.map(e => (CommonsJson.parse[ProductListingPageViewed](e))).flatMap( e =>
    e.getBasicRequestInfo.getIp.flatMap { ip =>
      val location = geoIpClient.getLocation(ip)

      val postalCode = Option(location).flatMap(l => Option(l.getPostalCode()))

      val msaOpt = postalCode.flatMap(z => Option(geoIpClient.getMsa(z)))

      val msaName = msaOpt.flatMap(m => Option(m.getName))

      msaName.map(n => (e.getSaleId, Map[String, Long](n -> 1l)))
    })

  // e3 => (sale id -> Map(msa -> count)
  val salesWithMsaFrequency = listingPageViews.reduceByKey { (v1, v2) =>
    v1.foldLeft(v2)((acc, entry) =>
      acc.updated(entry._1, entry._2 + acc.get(entry._1).getOrElse(0l))
    )
  }

  // totalsRrd => Map(msa -> count)
  val totalsRrd = listingPageViews.map(pair => pair._2).reduce { (mapOne, mapTwo) =>
    mapOne.foldLeft(mapTwo) { (acc, element) =>
      acc.updated(element._1, element._2 + acc.get(element._1).getOrElse(0l))
    }
  }.persist(StorageLevel.MEMORY_AND_DISK)

  // Update the map with totals count.

  totalsRrd.foreach { rrd =>
    rrd.foreach { map =>
      map.foreach { case (k, v) =>
        totalsMap.get(k).getOrElse({
          val al = new AtomicLong(0)
          totalsMap.put(k, al)
          al
        }).addAndGet(v)
      }
    }
  }

  // salesWithOomph => (sale id => Map(msa -> count))   - only popular sales
  val salesWithOomph = salesWithMsaFrequency.filter{ item =>
    item._2.values.sum > 200
  }

  // Determine the chiSquared value for each sale.
  // salesWithChiSquared => (saleid -> chiSquared)
  val salesWithChiSquared: DStream[(Long, Double)] = salesWithOomph.map{ case (saleId, msaMap) =>
    try {
      val withOnlyFrequentMsas = msaMap.filter(pair => pair._2 > 30)

      val totalViewsForSale = withOnlyFrequentMsas.map(_._2).sum

        val expectedViews = withOnlyFrequentMsas.map{ case (k, v) => (k, totalsMap.get(k).map(_.get()).getOrElse(0l)) }
      val expectedTotal = expectedViews.map(_._2).sum

      val scaleFactor = totalViewsForSale / expectedTotal
      val expectedViewsScaled = expectedViews.mapValues(_ * scaleFactor)

      val chiSquared = withOnlyFrequentMsas.flatMap { case(msa, observedViews) =>
          expectedViewsScaled.get(msa).filter(_ != 0l).map(expected =>
          pow((observedViews - expected), 2) / expected
        )
      }.sum

        (saleId, chiSquared)
    }
    catch {
      case ex => {
        ex.printStackTrace
        (-1l, -1.0)
      }
    }
  }.persist(StorageLevel.MEMORY_AND_DISK)


  salesWithChiSquared.foreach{ rrd =>
    println("salesWithChiSquared!")
    try {
      rrd.foreach { case (saleId, chiSquaredValue) =>
        println("saleId: " + saleId + " chi: " + chiSquaredValue)
      }
    }
    catch {
      case ex => ex.printStackTrace()
    }
  }

  ssc.start
Reply all
Reply to author
Forward
0 new messages