I want to read/write data from Bigtable in Pyspark for that I am trying below example:
from __future__ import print_function
import sys
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 9:
print("""
hbase_bigtable_output <project> <zone> <cluster> <table> <row> <family> <qualifier> <value>
Assumes you have created <table> with column family <family> in Bigtable cluster <cluster>
""", file=sys.stderr)
exit(-1)
project = sys.argv[1]
zone = sys.argv[2]
cluster = sys.argv[3]
table = sys.argv[4]
sc = SparkContext(appName="HBaseOutputFormat")
conf = {"hbase.client.connection.impl": "com.google.cloud.bigtable.hbase1_1.BigtableConnection",
"google.bigtable.project.id": project,
"google.bigtable.zone.name": zone,
"google.bigtable.cluster.name": cluster,
"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
sc.parallelize([sys.argv[5:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
conf=conf,
keyConverter=keyConv,
valueConverter=valueConv)
sc.stop()
command :
gcloud dataproc jobs submit pyspark HBaseOutputFormat.py --cluster <clustername> --properties=^#^spark.jars.packages=com.google.cloud.bigtable:bigtable-hbase-1.1:0.2.2,org.apache.hbase:hbase-server:1.1.1,org.apache.hbase:hbase-common:1.1.1#spark.jars=/usr/lib/spark/examples/jars/spark-examples.jar
But Its throwing an error:
16/11/02 07:57:46 ERROR org.apache.spark.api.python.Converter: Failed to load converter: org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter
Traceback (most recent call last):
File "/tmp/97ee9dc4-6ad7-490b-888a-8ecc7421a438/hbase_bigtable_input.py", line 48, in <module>
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
Then I tried to unzip the jar to see the classes and here is the list which I got:
> unzip /usr/lib/spark/examples/jars/spark-examples_2.11-2.0.1.jar
Output for pythonconverter classes:
org/apache/spark/examples/pythonconverters/
AvroConversionUtil$$anonfun$unpackArray$1.class AvroConversionUtil$$anonfun$unpackRecord$1.class AvroWrapperToJavaConverter.class
AvroConversionUtil$$anonfun$unpackArray$2.class AvroConversionUtil.class IndexedRecordToJavaConverter.class
AvroConversionUtil$$anonfun$unpackMap$1.class AvroConversionUtil$.class
That means the example jar which is shipped with google cloud dataproc 1.1 does not contain the python converters for Hbase
Does anyone know any workaround here to get the jar for dataproc and access the bigtable in pyspark job ?
Highly Appreciated!
Thank You,
Revan