Not serializable error when running spark job over HBase table

1,046 views
Skip to first unread message

Vinc Chan

unread,
Dec 5, 2013, 11:59:32 AM12/5/13
to spark...@googlegroups.com
I'm following the HBaseTest example code and trying to add map and filters to the hbaseRDD object.

This is my code so far (executed in the shell):

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "data_table")

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

val s:String = "Element1"

val hBaseIndex = hBaseRDD.map(record => (Bytes.toString(record._1.get()).split("/"),Bytes.toString(record._2.getValue(Bytes.toBytes("data"),Bytes.toBytes("routval"))))).filter(record => record._1(2).equals(s) && record._1(0) >= "15" && record._1(0) <= "30" && record._1(1).equals("field3")).map(record => (record._1(2),record._2.toDouble)).reduceByKey(_ + _)

hBaseIndex.count()

After this calling count, I get the error:

org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration ....

However, if I don't use the variable s in the filter statement and instead just instantiate the string within the filter statement: 

val hBaseIndex = hBaseRDD.map(record => (Bytes.toString(record._1.get()).split("/"),Bytes.toString(record._2.getValue(Bytes.toBytes("data"),Bytes.toBytes("routval"))))).filter(record => record._1(2).equals("Element1") && record._1(0) >= "15" && record._1(0) <= "30" && record._1(1).equals("field3")).map(record => (record._1(2),record._2.toDouble)).reduceByKey(_ + _)

The count runs fine.

Can anyone explain to me why this happens, and how I can fix/workaround it as I need to use an externally created set in my code to filter elements.

Thanks, Vincent.
Reply all
Reply to author
Forward
0 new messages