Serialization problems for org.apache.hadoop.io.Text keys in reduce

1,453 views
Skip to first unread message

cearl

unread,
Mar 29, 2012, 11:32:49 AM3/29/12
to Spark Users
Hi All,
I have a Hadoop RDD that I'm attempting to do a reduceByKey on which
is throwing a
java.io.NotSerializableException: org.apache.hadoop.io.Text
The relevant calls are
val rootInputFormat =
classOf[PipesBinaryMatrixInputFormatTempl[Text,BytesWritable]]
val hadoopRDD = new HadoopRDD(sc, conf, rootInputFormat,
classOf[Text], classOf[BytesWritable], minSplits)
val processedEntries = hadoopRDD.map( br =>
(processEntries(br._1,br_2))
.groupByKey().map(entry =>
packEnties(entry))
.pipe(process)
The exception is thrown when I attempt some action on
processedEntries,
processedEntries.forEach( s => println(s))

However, when the pipe is called without the reduce step -- that is
record by record to the pipe without combining -- the job goes to
completion without error.

I would assume that Text provides serialization capability.
I would also think that something conceptually equivalent to Hadoop
MapReduce combiner would also work here, not requiring the
serialization to send the data between nodes?
C

Matei Zaharia

unread,
Mar 29, 2012, 11:37:46 AM3/29/12
to spark...@googlegroups.com
The problem is that Text actually does not support Java serialization, only Hadoop's internal serialization. When you do reduceByKey and then pipe, some combining happens locally as you said, but then each map task needs to send a combined object, which I'm guessing contains a Text, to the reduce tasks. On the other hand, when you do groupByKey and then Pipe, the Text objects only exist within one process, and you get back strings from pipe(). The easiest fix would be to use something other than Text in the reduceByKey.

Matei

cearl

unread,
Mar 30, 2012, 1:41:25 PM3/30/12
to Spark Users
Matei,
I had tried transforming the Text to string and then to long through
hashCode through a map. This seems to pull up the same error, I would
assume roughly because the chain is being evaluated in a lazy manner
and the other threads may in fact need the Text?
Would I face the same serialization problem with any class that
extends Writable as a key?
I'm sure I'm thinking about it the wrong way.
Charles

Matei Zaharia

unread,
Mar 30, 2012, 2:52:47 PM3/30/12
to spark...@googlegroups.com
What's the exact stack trace, and the sequence of RDD operations you're doing? This problem should only happen if you call reduceByKey or groupByKey on an RDD containing Text or other Writable objects. If it contained Strings, it doesn't matter that Text was used along the way to create it, because the Text objects never have to leave the machine.

Matei

cearl

unread,
Mar 30, 2012, 10:11:39 PM3/30/12
to Spark Users
Thanks Matei,
Here is the sequence
val rootInputFormat =
classOf[PipesBinaryMatrixInputFormatTempl[Text,BytesWritable]]
val hadoopRDD = new HadoopRDD(sc, conf, rootInputFormat,
classOf[Text], classOf[BytesWritable], minSplits)
val mappedFloats = hadoopRDD.map( br =>
(br._1.toString(),computeFloat(br._2))).groupByKey()
println("Records processed "+mappedFloats.count)
and
computeFloat creates a List[Float] from the BytesWritable
it works when run locally, when run on Mesos, the following is the
stack trace:

2/03/30 22:09:15 INFO spark.MapOutputTrackerActor: Registered actor on
port 7077
12/03/30 22:09:15 INFO spark.CacheTrackerActor: Registered actor on
port 7077
12/03/30 22:09:15 INFO spark.CacheTrackerActor: Registered actor on
port 7077
12/03/30 22:09:15 INFO spark.MapOutputTrackerActor: Registered actor
on port 7077
12/03/30 22:09:15 INFO spark.MesosScheduler: Temp directory for JARs: /
tmp/spark-b8578f01-cca7-4cf1-80d4-eddf27d35a10
12/03/30 22:09:15 INFO server.Server: jetty-7.x.y-SNAPSHOT
12/03/30 22:09:15 INFO server.AbstractConnector: Started
SelectChann...@0.0.0.0:34770 STARTING
12/03/30 22:09:15 INFO spark.MesosScheduler: JAR server started at
http://192.168.1.64:34770
12/03/30 22:09:15 INFO spark.MesosScheduler: Registered as framework
ID 201203291159-0-0014
12/03/30 22:09:15 INFO spark.MesosScheduler: Registered as framework
ID 201203291159-0-0015
12/03/30 22:09:16 INFO mapred.FileInputFormat: Total input paths to
process : 5
12/03/30 22:09:16 INFO spark.SparkContext: Starting job...
12/03/30 22:09:16 INFO spark.CacheTracker: Registering RDD ID 2 with
cache
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Registering RDD 2 with
8 partitions
12/03/30 22:09:16 INFO spark.CacheTracker: Registering RDD ID 1 with
cache
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Registering RDD 1 with
40 partitions
12/03/30 22:09:16 INFO spark.CacheTracker: Registering RDD ID 0 with
cache
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Registering RDD 0 with
40 partitions
12/03/30 22:09:16 INFO spark.CacheTrackerActor: Asked for current
cache locations
12/03/30 22:09:16 INFO spark.MesosScheduler: Final stage: Stage 0
12/03/30 22:09:16 INFO spark.MesosScheduler: Parents of final stage:
List(Stage 1)
12/03/30 22:09:16 INFO spark.MesosScheduler: Missing parents:
List(Stage 1)
12/03/30 22:09:16 INFO spark.MesosScheduler: Submitting Stage 1, which
has no missing parents
12/03/30 22:09:16 INFO spark.MesosScheduler: Got a job with 40 tasks
12/03/30 22:09:16 INFO spark.MesosScheduler: Adding job with ID 0
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:0 as TID 0 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:1 as TID 1 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:2 as TID 2 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:3 as TID 3 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:4 as TID 4 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:5 as TID 5 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:6 as TID 6 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:16 INFO spark.SimpleJob: Starting task 0:7 as TID 7 on
slave 201203291159-0-0: azad.gateway.2wire.net (preferred)
12/03/30 22:09:19 INFO spark.SimpleJob: Lost TID 0 (task 0:0)
12/03/30 22:09:19 INFO spark.SimpleJob: Loss was due to
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1180)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:
1528)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:
1493)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:
1416)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:
1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:
346)
at spark.JavaSerializationStream.writeObject(JavaSerializer.scala:7)
at spark.ShuffleMapTask$$anonfun$run$1$$anonfun$apply$mcVI$sp
$1.apply(ShuffleMapTask.scala:31)
at spark.ShuffleMapTask$$anonfun$run$1$$anonfun$apply$mcVI$sp
$1.apply(ShuffleMapTask.scala:31)
at scala.collection.mutable.HashMap$$anonfun$foreach
$1.apply(HashMap.scala:93)
at scala.collection.mutable.HashMap$$anonfun$foreach
$1.apply(HashMap.scala:93)
at scala.collection.Iterator$class.foreach(Iterator.scala:660)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:
157)
at scala.collection.mutable.HashTable
$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:93)
at spark.ShuffleMapTask.run(ShuffleMapTask.scala:28)
at spark.ShuffleMapTask.run(ShuffleMapTask.scala:9)
at spark.Executor$TaskRunner.run(Executor.scala:71)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
1110)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:679)

Matei Zaharia

unread,
Mar 30, 2012, 10:25:59 PM3/30/12
to spark...@googlegroups.com
Weird; I just tried a similar thing and it worked. Are you sure that you don't have an older version of your code on one of the nodes, or something like that?

Matei

Reply all
Reply to author
Forward
0 new messages