| Job job = new Job(getConf(), getConf().get("jobName")); |
|
job.setJarByClass(RubyMapReduce.class); |
|
job.setMapperClass(CassandraMapper.class); |
|
job.setReducerClass(CassandraReducer.class); |
|
job.setInputFormatClass(ColumnFamilyInputFormat.class); |
|
job.setMapOutputKeyClass(Text.class); |
|
job.setMapOutputValueClass(ObjectWritable.class); |
|
|
|
ConfigHelper.setRpcPort(job.getConfiguration(), getConf().get("cassandraPort")); |
|
ConfigHelper.setInitialAddress(job.getConfiguration(), getConf().get("cassandraHost")); |
|
ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); |
|
ConfigHelper.setInputColumnFamily(job.getConfiguration(), getConf().get("inputKeyspace"), |
|
getConf().get("inputColumnFamily")); |
|
|
|
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), getConf().get("outputKeyspace"), |
|
getConf().get("outputColumnFamily")); |
|
job.setOutputFormatClass(ColumnFamilyOutputFormat.class); |
|
SlicePredicate sp = new SlicePredicate(); |
|
SliceRange sr = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, |
|
MAX_COLUMNS_PER_ROW); |
| sp.setSlice_range(sr); |
|
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), sp); |
|
|
|
job.waitForCompletion(true); |
| return 0; |
Founder/CEO Spinn3r.com
Location: San Francisco, CA
Skype: burtonator
Skype-in: (415) 871-0687