Thanks Matei,
Here is the sequence
val rootInputFormat =
classOf[PipesBinaryMatrixInputFormatTempl[Text,BytesWritable]]
val hadoopRDD = new HadoopRDD(sc, conf, rootInputFormat,
classOf[Text], classOf[BytesWritable], minSplits)
val mappedFloats = hadoopRDD.map( br =>
(br._1.toString(),computeFloat(br._2))).groupByKey()
println("Records processed "+mappedFloats.count)
and
computeFloat creates a List[Float] from the BytesWritable
it works when run locally, when run on Mesos, the following is the
stack trace:
2/03/30 22:09:15 INFO spark.MapOutputTrackerActor: Registered actor on
port 7077
12/03/30 22:09:15 INFO spark.CacheTrackerActor: Registered actor on
port 7077
12/03/30 22:09:15 INFO spark.CacheTrackerActor: Registered actor on
port 7077
12/03/30 22:09:15 INFO spark.MapOutputTrackerActor: Registered actor
on port 7077
12/03/30 22:09:15 INFO spark.MesosScheduler: Temp directory for JARs: /
tmp/spark-b8578f01-cca7-4cf1-80d4-eddf27d35a10
12/03/30 22:09:15 INFO server.Server: jetty-7.x.y-SNAPSHOT
12/03/30 22:09:15 INFO server.AbstractConnector: Started
SelectChann...@0.0.0.0:34770 STARTING
12/03/30 22:09:15 INFO spark.MesosScheduler: JAR server started at
http://192.168.1.64:34770
12/03/30 22:09:15 INFO spark.MesosScheduler: Registered as framework
ID 201203291159-0-0014
12/03/30 22:09:15 INFO spark.MesosScheduler: Registered as framework
ID 201203291159-0-0015
12/03/30 22:09:16 INFO mapred.FileInputFormat: Total input paths to
process : 5
12/03/30 22:09:16 INFO spark.SparkContext: Starting job...
12/03/30 22:09:16 INFO spark.CacheTracker: Registering RDD ID 2 with
cache
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Registering RDD 2 with
8 partitions
12/03/30 22:09:16 INFO spark.CacheTracker: Registering RDD ID 1 with
cache
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Registering RDD 1 with
40 partitions
12/03/30 22:09:16 INFO spark.CacheTracker: Registering RDD ID 0 with
cache
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Registering RDD 0 with
40 partitions
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Asked for current
cache locations
12/03/30 22:09:16 INFO spark.MesosScheduler: Final stage: Stage 0
12/03/30 22:09:16 INFO spark.MesosScheduler: Parents of final stage:
List(Stage 1)
12/03/30 22:09:16 INFO spark.MesosScheduler: Missing parents:
List(Stage 1)
12/03/30 22:09:16 INFO spark.MesosScheduler: Submitting Stage 1, which
has no missing parents
12/03/30 22:09:16 INFO spark.MesosScheduler: Got a job with 40 tasks
12/03/30 22:09:16 INFO spark.MesosScheduler: Adding job with ID 0
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:0 as TID 0 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:1 as TID 1 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:2 as TID 2 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:3 as TID 3 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:4 as TID 4 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:5 as TID 5 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:6 as TID 6 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:7 as TID 7 on
slave 201203291159-0-0:
azad.gateway.2wire.net (preferred)
12/03/30 22:09:19 INFO spark.SimpleJob: Lost TID 0 (task 0:0)
12/03/30 22:09:19 INFO spark.SimpleJob: Loss was due to
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1180)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:
1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:
1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:
1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:
346)
at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:7)
at spark.ShuffleMapTask$$anonfun$run$1$$anonfun$apply$mcVI$sp
$1.apply(ShuffleMapTask.scala:31)
at spark.ShuffleMapTask$$anonfun$run$1$$anonfun$apply$mcVI$sp
$1.apply(ShuffleMapTask.scala:31)
at scala.collection.mutable.HashMap$$anonfun$foreach
$1.apply(HashMap.scala:93)
at scala.collection.mutable.HashMap$$anonfun$foreach
$1.apply(HashMap.scala:93)
at scala.collection.Iterator$class.foreach(Iterator.scala:660)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:
157)
at scala.collection.mutable.HashTable
$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:93)
at spark.ShuffleMapTask.run(ShuffleMapTask.scala:28)
at spark.ShuffleMapTask.run(ShuffleMapTask.scala:9)
at spark.Executor$TaskRunner.run(Executor.scala:71)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
1110)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)