Description:
|
When I turn on Kryo serialization, I get the following error as I increase the size of my input dataset. (From ~10GB to ~100GB). This issue does not manifest itself when I turn kryo off.
I have code that successfully reads files, parses them into an RDD(String,Vector), which can then be .count()'ed. I then run a .flatMap on these, with a function that has the following signature:
def expandData(x: (String, Vector)): Seq(String, Float, Vector)
And running a .count() on that RDD crashes - stack trace of failed task looks like this:
13/05/31 00:16:53 INFO cluster.TaskSetManager: Finished TID 2024 in 23594 ms (progress: 10/1000)
13/05/31 00:16:53 INFO scheduler.DAGScheduler: Completed ResultTask(3, 24)
13/05/31 00:16:53 INFO cluster.ClusterScheduler: parentName:,name:TaskSet_3,runningTasks:151
13/05/31 00:16:53 INFO cluster.TaskSetManager: Starting task 3.0:175 as TID 2161 on slave 14: ip-10-62-199-77.ec2.internal:40850 (NODE_LOCAL)
13/05/31 00:16:53 INFO cluster.TaskSetManager: Serialized task 3.0:175 as 2832 bytes in 0 ms
13/05/31 00:16:53 INFO cluster.TaskSetManager: Lost TID 2053 (task 3.0:49)
13/05/31 00:16:53 INFO cluster.TaskSetManager: Loss was due to com.esotericsoftware.kryo.KryoException
com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
elements (org.mlbase.Vector)
_3 (scala.Tuple3)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:571)
at spark.KryoSerializationStream.writeObject(KryoSerializer.scala:26)
at spark.serializer.SerializationStream$class.writeAll(Serializer.scala:63)
at spark.KryoSerializationStream.writeAll(KryoSerializer.scala:21)
at spark.storage.BlockManager.dataSerialize(BlockManager.scala:910)
at spark.storage.MemoryStore.putValues(MemoryStore.scala:61)
at spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:584)
at spark.storage.BlockManager.put(BlockManager.scala:580)
at spark.CacheManager.getOrCompute(CacheManager.scala:55)
at spark.RDD.iterator(RDD.scala:207)
at spark.scheduler.ResultTask.run(ResultTask.scala:84)
at spark.executor.Executor$TaskRunner.run(Executor.scala:104)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:679)
13/05/31 00:16:53 INFO cluster.ClusterScheduler: parentName:,name:TaskSet_3,runningTasks:151
13/05/31 00:16:53 INFO cluster.TaskSetManager: Starting task 3.0:49 as TID 2162 on slave 12: ip-10-11-46-255.ec2.internal:38878 (NODE_LOCAL)
13/05/31 00:16:53 INFO cluster.TaskSetManager: Serialized task 3.0:49 as 2832 bytes in 0 ms
13/05/31 00:16:53 INFO cluster.ClusterScheduler: parentName:,name:TaskSet_3,runningTasks:152
13/05/31 00:16:54 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_7_257 in mem
My Kryo Registrator looks like this:
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo)
{
kryo.register(classOf[Vector])
kryo.register(classOf[String])
kryo.register(classOf[Float])
kryo.register(classOf[Tuple3[String,Float,Vector]])
kryo.register(classOf[Seq[Tuple3[String,Float,Vector]]])
kryo.register(classOf[Map[String,Vector]])
}
}
"Vector" in this case is an org.mlbase.Vector, which in this case is a slightly modified version of spark.util.Vector (uses floats instead of Doubles).
|