Hi,
I'm new to EB and a bit lost with the different options but I'm a somewhat seasoned dev. I have a couple of TB of JSON files which I'd like to convert to thrift
so that I can use them with Hive and also a standalone Java program.
Today
I've tried installing the native-lzo package to use with EB but had
quite some issues getting this running and after that experience I'd be
happy with the builtin Java compression w/o using native libs or no
compression at all since we're storing this on ZFS with compression
enabled.
I'd like to know whether
a) my approach is correct
b) why Hive is complaining about the missing LZO codec
Any help is greatly appreciated
What I *think* I want to do is to convert the JSON files (~5GB each) into SequenceFiles with a Null key and either the Thrift object or the serialized thrift object as a byte array. Not sure if this is the best approach. Here is the conversion function:
public void jsonToSequenceFile(String src, String dst) throws Exception {
long start = System.nanoTime();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
Path path = new Path(dst);
NullWritable key = NullWritable.get();
BytesWritable value = new BytesWritable();
SequenceFile.Writer writer = null;
GZIPInputStream gis = new GZIPInputStream(new FileInputStream(src));
LineNumberReader r = new LineNumberReader(new InputStreamReader(gis));
ByteArrayOutputStream bos = new ByteArrayOutputStream();
TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
TCfpEvent e = new TCfpEvent();
try {
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
while (r.ready()) {
parseLine(r.readLine(), e);
bos.write(serializer.serialize(e));
byte[] data = bos.toByteArray();
value.set(data, 0, data.length);
writer.append(key, value);
bos.reset();
}
r.close();
} finally {
IOUtils.closeStream(writer);
}
long t = System.nanoTime() - start;
long count = r.getLineNumber();
System.out.format("[JSON -> sequence file] %s -> %s. Processed %d records in %2.3f sec (%d records/sec)\n",
src, dst, count, t / 1000000000d, count * 1000000000 / t);
}
I've then created an external table in Hive like described in the EB documentation.
create external table cfp
partitioned by (dt string)
row format serde "org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer"
with serdeproperties (
"serialization.class"="TCfpEvent",
"serialization.format"="org.apache.thrift.protocol.TBinaryProtocol")
stored as
inputformat "com.twitter.elephantbird.mapred.input.DeprecatedRawMultiInputFormat"
outputformat "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
ALTER TABLE cfp ADD PARTITION (dt = '2014-01-05') location '/Users/frschroeder/cfp/thrift.seq';
and then I've tried a SELECT COUNT(*) and get the following error:
java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
My question is Why since I didn't specify compression anywhere.
Thx
Frank