Hi,
I am using a spark-0.8.0 on EC2. There are 3 slaves and 1 master in my cluster.
hbase-0.94-12 is installed on the master. It works in pseudo-distributed mode with only one region server on the master.
I have already rebuild spark with HBASE_VERSION = "0.94.12" on all nodes.
When reading data from hbase, I encountered "unread block data" error as following:
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: unread block data
...
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
Having searching other threads, I found the issue often occurred when using spark with hbase/cassandra. However, the issue is never perfectly resovled, afaik.
Some answers are trying to change the serializer. Well, I noticed that the JavaSerializer is used by default. So I think I could give KryoSerializer a try.
My job is exactly the same as the hbaseTest example given by spark, except some hbase configuration settings:
object hbaseTest {
def main(args: Array[String]) {
// spark settings
val namenodeURL = Source.fromFile("/root/spark-ec2/masters").mkString.trim
val sparkPort = 7077
val hdfsPort = 9010
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext("spark://" + namenodeURL + ":" + sparkPort, "HBaseTest",
System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/test_2.9.3-0.1.jar"))
val conf = HBaseConfiguration.create()
val tableName = "client"
// hbase config
conf.setBoolean("hbase.cluster.distributed", true)
conf.set("hbase.zookeeper.quorum", "ip-10-184-28-140.ec2.internal")
conf.setInt("hbase.client.scanner.caching", 10000)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if(!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}
I have added System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") to my code.
It doesn't work with the same error.
And it seems that slaves don't use the right serializer, as the log shows as following:
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:
host.name=ip-10-184-28-140.ec2.internal
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.version=1.7.0_25
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.25.x86_64/jre
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.class.path=/root/spark/sbt/sbt-launch-0.11.3-2.jar
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:
os.name=Linux
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:os.version=3.4.62-53.42.amzn1.x86_64
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:
user.name=root
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:user.home=/root
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:user.dir=/root/clarabox
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ip-10-184-28-140.ec2.internal:2181 sessionTimeout=180000 watcher=hconnection
13/10/29 14:49:59 INFO zookeeper.ClientCnxn: Opening socket connection to server ip-10-184-28-140.ec2.internal/
10.184.28.140:218113/10/29 14:49:59 INFO zookeeper.RecoverableZooKeeper: The identifier of this process is 10...@ip-10-184-28-140.ec2.internal
13/10/29 14:49:59 INFO zookeeper.ClientCnxn: Socket connection established to ip-10-184-28-140.ec2.internal/
10.184.28.140:2181, initiating session
13/10/29 14:49:59 INFO zookeeper.ClientCnxn: Session establishment complete on server ip-10-184-28-140.ec2.internal/
10.184.28.140:2181, sessionid = 0x14203a7ae1a0020, negotiated timeout = 180000
13/10/29 14:50:01 INFO storage.MemoryStore: ensureFreeSpace(1720) called with curMem=0, maxMem=802789785
13/10/29 14:50:01 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 1720.0 B, free 765.6 MB)
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkE...@ip-10-184-29-190.ec2.internal:57164/user/Executor] with ID 0
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkE...@ip-10-164-92-149.ec2.internal:38325/user/Executor] with ID 2
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkE...@ip-10-154-186-155.ec2.internal:49833/user/Executor] with ID 1
13/10/29 14:50:01 INFO spark.SparkContext: Starting job: count at hbaseFetcher.scala:52
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Got job 0 (count at hbaseFetcher.scala:52) with 1 output partitions (allowLocal=false)
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at hbaseFetcher.scala:52)
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Missing parents: List()
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Submitting Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at hbaseFetcher.scala:48), which has no missing parents
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at hbaseFetcher.scala:48)
13/10/29 14:50:01 INFO cluster.ClusterScheduler: Adding task set 0.0 with 1 tasks
13/10/29 14:50:01 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 14:50:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-184-29-190.ec2.internal:38967 with 326.7 MB RAM
13/10/29 14:50:01 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1488 bytes in 285 ms
13/10/29 14:50:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-154-186-155.ec2.internal:58949 with 326.7 MB RAM
13/10/29 14:50:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-164-92-149.ec2.internal:52189 with 326.7 MB RAM
13/10/29 14:50:03 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0)
13/10/29 14:50:03 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2419)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1380)
at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1954)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1794)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 0 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 1 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 1]
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 2 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 0 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 2]
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 3 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 0 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 3 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 3]
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 4 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 1 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 4 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 4]
where ip-10-184-29-190.ec2.internal is the hostname of a slave.
Some said it is compatibility problem for spark and hbase. I will give a try on hbase-94.6 which is a default hbase version for spark. I don't think it can resolve the problem.
Would it be nicer if spark gives a tutorial about how to use spark with hbase on cluster mode ? I searched a lot, but little detail is found.
Any help is highly appreciated. =)
Hao