I'm following the HBaseTest example code and trying to add map and filters to the hbaseRDD object.
This is my code so far (executed in the shell):
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "data_table")
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val s:String = "Element1"
val hBaseIndex = hBaseRDD.map(record => (Bytes.toString(record._1.get()).split("/"),Bytes.toString(record._2.getValue(Bytes.toBytes("data"),Bytes.toBytes("routval"))))).filter(record => record._1(2).equals(s) && record._1(0) >= "15" && record._1(0) <= "30" && record._1(1).equals("field3")).map(record => (record._1(2),record._2.toDouble)).reduceByKey(_ + _)
hBaseIndex.count()
After this calling count, I get the error:
org.apache.spark.SparkException: Job failed: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration ....
However, if I don't use the variable s in the filter statement and instead just instantiate the string within the filter statement:
val hBaseIndex = hBaseRDD.map(record => (Bytes.toString(record._1.get()).split("/"),Bytes.toString(record._2.getValue(Bytes.toBytes("data"),Bytes.toBytes("routval"))))).filter(record => record._1(2).equals("Element1") && record._1(0) >= "15" && record._1(0) <= "30" && record._1(1).equals("field3")).map(record => (record._1(2),record._2.toDouble)).reduceByKey(_ + _)
The count runs fine.
Can anyone explain to me why this happens, and how I can fix/workaround it as I need to use an externally created set in my code to filter elements.
Thanks, Vincent.