unread block data when reading from hbase

1,271 views
Skip to first unread message

Hao REN

unread,
Oct 29, 2013, 10:58:20 AM10/29/13
to spark...@googlegroups.com
Hi,

I am using a spark-0.8.0 on EC2. There are 3 slaves and 1 master in my cluster.

hbase-0.94-12 is installed on the master. It works in pseudo-distributed mode with only one region server on the master. 

I have already rebuild spark with HBASE_VERSION = "0.94.12" on all nodes.

When reading data from hbase, I encountered "unread block data" error as following:

13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: unread block data
...
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Having searching other threads, I found the issue often occurred when using spark with hbase/cassandra. However, the issue is never perfectly resovled, afaik.

Some answers are trying to change the serializer. Well, I noticed that the JavaSerializer is used by default. So I think I could give KryoSerializer a try.

My job is exactly the same as the hbaseTest example given by spark, except some hbase configuration settings:

object hbaseTest {

  def main(args: Array[String]) {

    // spark settings
    val namenodeURL = Source.fromFile("/root/spark-ec2/masters").mkString.trim
    val sparkPort = 7077
    val hdfsPort = 9010

    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext("spark://" + namenodeURL + ":" + sparkPort, "HBaseTest",
        System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/test_2.9.3-0.1.jar"))

    val conf = HBaseConfiguration.create()

    val tableName = "client"
  
    // hbase config
    conf.set("hbase.rootdir", "hdfs://ec2-54-211-136-116.compute-1.amazonaws.com:9010/hbase")
    conf.setBoolean("hbase.cluster.distributed", true)
    conf.set("hbase.zookeeper.quorum", "ip-10-184-28-140.ec2.internal")
    conf.setInt("hbase.client.scanner.caching", 10000)

    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    // Initialize hBase table if necessary
    val admin = new HBaseAdmin(conf)
    if(!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }
}

I have added System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") to my code. 
It doesn't work with the same error.
And it seems that slaves don't use the right serializer, as the log shows as following:

13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.3.3-1073969, built on 02/23/2011 22:27 GMT
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:host.name=ip-10-184-28-140.ec2.internal
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.version=1.7.0_25
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.home=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.25.x86_64/jre
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.class.path=/root/spark/sbt/sbt-launch-0.11.3-2.jar
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/tmp
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:os.version=3.4.62-53.42.amzn1.x86_64
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:user.name=root
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:user.home=/root
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Client environment:user.dir=/root/clarabox
13/10/29 14:49:59 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=ip-10-184-28-140.ec2.internal:2181 sessionTimeout=180000 watcher=hconnection
13/10/29 14:49:59 INFO zookeeper.ClientCnxn: Opening socket connection to server ip-10-184-28-140.ec2.internal/10.184.28.140:2181
13/10/29 14:49:59 INFO zookeeper.RecoverableZooKeeper: The identifier of this process is 10...@ip-10-184-28-140.ec2.internal
13/10/29 14:49:59 INFO zookeeper.ClientCnxn: Socket connection established to ip-10-184-28-140.ec2.internal/10.184.28.140:2181, initiating session
13/10/29 14:49:59 INFO zookeeper.ClientCnxn: Session establishment complete on server ip-10-184-28-140.ec2.internal/10.184.28.140:2181, sessionid = 0x14203a7ae1a0020, negotiated timeout = 180000
13/10/29 14:50:01 INFO storage.MemoryStore: ensureFreeSpace(1720) called with curMem=0, maxMem=802789785
13/10/29 14:50:01 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 1720.0 B, free 765.6 MB)
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkE...@ip-10-184-29-190.ec2.internal:57164/user/Executor] with ID 0
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkE...@ip-10-164-92-149.ec2.internal:38325/user/Executor] with ID 2
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkE...@ip-10-154-186-155.ec2.internal:49833/user/Executor] with ID 1
13/10/29 14:50:01 INFO spark.SparkContext: Starting job: count at hbaseFetcher.scala:52
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Got job 0 (count at hbaseFetcher.scala:52) with 1 output partitions (allowLocal=false)
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at hbaseFetcher.scala:52)
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Missing parents: List()
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Submitting Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at hbaseFetcher.scala:48), which has no missing parents
13/10/29 14:50:01 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (NewHadoopRDD[0] at newAPIHadoopRDD at hbaseFetcher.scala:48)
13/10/29 14:50:01 INFO cluster.ClusterScheduler: Adding task set 0.0 with 1 tasks
13/10/29 14:50:01 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 14:50:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-184-29-190.ec2.internal:38967 with 326.7 MB RAM
13/10/29 14:50:01 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1488 bytes in 285 ms
13/10/29 14:50:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-154-186-155.ec2.internal:58949 with 326.7 MB RAM
13/10/29 14:50:01 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager ip-10-164-92-149.ec2.internal:52189 with 326.7 MB RAM
13/10/29 14:50:03 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0)
13/10/29 14:50:03 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2419)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1380)
at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1954)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1848)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1794)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 0 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 1 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 1]
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 2 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 0 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 2 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 2]
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 3 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 0 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 3 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 3]
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Starting task 0.0:0 as TID 4 on executor 0: ip-10-184-29-190.ec2.internal (PROCESS_LOCAL)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Serialized task 0.0:0 as 1489 bytes in 1 ms
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Lost TID 4 (task 0.0:0)
13/10/29 13:59:27 INFO cluster.ClusterTaskSetManager: Loss was due to java.lang.IllegalStateException: unread block data [duplicate 4]

where  ip-10-184-29-190.ec2.internal is the hostname of a slave.

Some said it is compatibility problem for spark and hbase. I will give a try on hbase-94.6 which is a default hbase version for spark. I don't think it can resolve the problem.

Would it be nicer if spark gives a tutorial about how to use spark with hbase on cluster mode ? I searched a lot, but little detail is found.

Any help is highly appreciated. =)

Hao


Hao REN

unread,
Oct 30, 2013, 7:18:17 AM10/30/13
to spark...@googlegroups.com
Update:

I just made it work by adding hbase jar to the sparkContext:

 val sc = new SparkContext("spark://" + namenodeURL + ":" + sparkPort, "HBaseTest",
        System.getenv("SPARK_HOME"), Seq("target/scala-2.9.3/test_2.9.3-0.1.jar", "lib/hbase-0.94.12.jar"))

It works fine now.

But I don't understand the workaround. As I said in my previous post, I have built spark with hbase 0.94.12 on all nodes by "sbt assembly"
All nodes must have a hbase jar. I have checked spark/lib_managed/jars. There is the hbase jar in it.

So why can not the slave find it ?

Hao.
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkExecutor@ip-10-184-29-190.ec2.internal:57164/user/Executor] with ID 0
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkExecutor@ip-10-164-92-149.ec2.internal:38325/user/Executor] with ID 2
13/10/29 14:50:01 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka://sparkExecutor@ip-10-154-186-155.ec2.internal:49833/user/Executor] with ID 1
Reply all
Reply to author
Forward
0 new messages