I've been working on this.
You need to have a custom mapper, which take the key value,
(ImmutableBytesWritable, RowResult) and outputs
(TypedBytesWritable, TypedBytesWritable). Then you can just use dumbos
java integration.
http://dumbotics.com/2009/06/16/integration-with-java-code/
I've made a mapper that should do that, but I've missed something, or
there is a bug.
I keep getting: struct.error: Invalid type byte: 50
here's my mapper class:
any ideas?
package fm.last.bobbie.mapred;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.typedbytes.TypedBytesWritable;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
/**
* Mapper class for fetching columns from hbase and returning typed
bytes for use with dumbo. Uses the depreciated
* mapred api, because hadoop streaming uses it.
*
* @author tims
*/
public class HBaseDumboMapper implements
Mapper<ImmutableBytesWritable, RowResult, TypedBytesWritable,
TypedBytesWritable> {
private static Logger log = Logger.getLogger(HBaseDumboMapper.class);
@Override
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<TypedBytesWritable, TypedBytesWritable>
collector, Reporter reporter) throws IOException {
// family->qualifier->value
Map<byte[], Map<byte[], byte[]>> columns = new HashMap<byte[],
Map<byte[], byte[]>>();
for (Entry<byte[], Cell> entry : value.entrySet()) {
byte[] cellValue = entry.getValue().getValue();
String columnStr = Bytes.toString(entry.getKey());
String[] familyQualifier = columnStr.split(":");
if (familyQualifier.length != 2) {
throw new IOException("cell column not in form
family:qualifier, got " + columnStr);
}
byte[] family = Bytes.toBytes(familyQualifier[0]);
byte[] qualifier = Bytes.toBytes(familyQualifier[1]);
Map<byte[], byte[]> column = columns.get(family);
if (column == null) {
column = new HashMap<byte[], byte[]>();
}
column.put(qualifier, cellValue);
columns.put(family, column);
}
TypedBytesWritable row = new TypedBytesWritable();
row.setValue(value.getRow());
TypedBytesWritable tbColumns = new TypedBytesWritable();
tbColumns.setValue(columns);
collector.collect(row, tbColumns);
}
@Override
public void configure(JobConf job) {
}
@Override
public void close() throws IOException {
}
}
2009/7/27 Mat Lehmann <
matle...@web.de>: