Process HBase Tables with Dumbo

221 views
Skip to first unread message

Mat Lehmann

unread,
Jul 27, 2009, 7:32:18 AM7/27/09
to dumbo-user
Hi,

is there an established way to process HBase tables with dumbo?

I am using HBase trunk with dumbo 0.21.

Tried

dumbo mrtest.py -D hbase.mapred.tablecolumns="a:name" -output out -
input content -hadoop ~/work/hadoop -inputformat
org.apache.hadoop.hbase.mapred.TableInputFormat

which gives the error:

INFO: inputting typed bytes
INFO: buffersize = 168960
INFO: outputting typed bytes
Traceback (most recent call last):
File "/usr/lib/python2.6/runpy.py", line 122, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.6/runpy.py", line 34, in _run_code
exec code in run_globals
File "/home/mat/work/lalisio/data/hadoop-0.20/mapred/local/
taskTracker/jobcache/job_200907271130_0005/
attempt_200907271130_0005_m_000000_0/work/mrtest.py", line 10, in
<module>
dumbo.run(mapper,reducer,combiner=reducer)
File "build/bdist.linux-i686/egg/dumbo/core.py", line 611, in run
File "build/bdist.linux-i686/egg/typedbytes.py", line 371, in writes
File "build/bdist.linux-i686/egg/typedbytes.py", line 237, in
_writes
File "build/bdist.linux-i686/egg/typedbytes.py", line 210, in
flatten
File "build/bdist.linux-i686/egg/dumbo/core.py", line 748, in
redfunc_iter
File "build/bdist.linux-i686/egg/dumbo/core.py", line 755, in
<genexpr>
File "build/bdist.linux-i686/egg/dumbo/util.py", line 32, in sorted
File "build/bdist.linux-i686/egg/dumbo/util.py", line 32, in
<genexpr>
File "build/bdist.linux-i686/egg/dumbo/core.py", line 733, in
mapfunc_iter
File "build/bdist.linux-i686/egg/typedbytes.py", line 355, in reads
File "build/bdist.linux-i686/egg/typedbytes.py", line 85, in _reads
File "build/bdist.linux-i686/egg/typedbytes.py", line 74, in _read
File "build/bdist.linux-i686/egg/typedbytes.py", line 163, in
invalid_typecode
struct.error: Invalid type byte: 50
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess
failed with code 255
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads
(PipeMapRed.java:362)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished
(PipeMapRed.java:564)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:
36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:356)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)

Probably ResultSet from HBase can not be deserialzed by typedbytes.

So do I need to write my own InputType or is there an existing one?

Or is there a completely different way to process HBase Tables with
dumbo?

I am thankful for any hints.

Mat

Tim Sell

unread,
Jul 27, 2009, 7:43:41 AM7/27/09
to dumbo...@googlegroups.com
I've been working on this.
You need to have a custom mapper, which take the key value,
(ImmutableBytesWritable, RowResult) and outputs
(TypedBytesWritable, TypedBytesWritable). Then you can just use dumbos
java integration.

http://dumbotics.com/2009/06/16/integration-with-java-code/

I've made a mapper that should do that, but I've missed something, or
there is a bug.
I keep getting: struct.error: Invalid type byte: 50
here's my mapper class:

any ideas?



package fm.last.bobbie.mapred;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.typedbytes.TypedBytesWritable;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

/**
* Mapper class for fetching columns from hbase and returning typed
bytes for use with dumbo. Uses the depreciated
* mapred api, because hadoop streaming uses it.
*
* @author tims
*/
public class HBaseDumboMapper implements
Mapper<ImmutableBytesWritable, RowResult, TypedBytesWritable,
TypedBytesWritable> {
private static Logger log = Logger.getLogger(HBaseDumboMapper.class);

@Override
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<TypedBytesWritable, TypedBytesWritable>
collector, Reporter reporter) throws IOException {

// family->qualifier->value
Map<byte[], Map<byte[], byte[]>> columns = new HashMap<byte[],
Map<byte[], byte[]>>();

for (Entry<byte[], Cell> entry : value.entrySet()) {
byte[] cellValue = entry.getValue().getValue();
String columnStr = Bytes.toString(entry.getKey());
String[] familyQualifier = columnStr.split(":");
if (familyQualifier.length != 2) {
throw new IOException("cell column not in form
family:qualifier, got " + columnStr);
}
byte[] family = Bytes.toBytes(familyQualifier[0]);
byte[] qualifier = Bytes.toBytes(familyQualifier[1]);
Map<byte[], byte[]> column = columns.get(family);
if (column == null) {
column = new HashMap<byte[], byte[]>();
}
column.put(qualifier, cellValue);
columns.put(family, column);
}

TypedBytesWritable row = new TypedBytesWritable();
row.setValue(value.getRow());

TypedBytesWritable tbColumns = new TypedBytesWritable();
tbColumns.setValue(columns);

collector.collect(row, tbColumns);
}

@Override
public void configure(JobConf job) {

}

@Override
public void close() throws IOException {

}
}



2009/7/27 Mat Lehmann <matle...@web.de>:

Klaas Bosteels

unread,
Jul 27, 2009, 8:00:40 AM7/27/09
to dumbo...@googlegroups.com
Think you might want to avoid passing byte[]s to
TypedBytesWritable.setValue() (either directly or indirectly as part
of a collection). What gets called under the hood is the following
method:

/**
* Writes a Java object as a typed bytes sequence.
*
* @param obj the object to be written
* @throws IOException
*/
public void write(Object obj) throws IOException {
if (obj instanceof Buffer) {
writeBytes(((Buffer) obj).get());
} else if (obj instanceof Byte) {
writeByte((Byte) obj);
} else if (obj instanceof Boolean) {
writeBool((Boolean) obj);
} else if (obj instanceof Integer) {
writeInt((Integer) obj);
} else if (obj instanceof Long) {
writeLong((Long) obj);
} else if (obj instanceof Float) {
writeFloat((Float) obj);
} else if (obj instanceof Double) {
writeDouble((Double) obj);
} else if (obj instanceof String) {
writeString((String) obj);
} else if (obj instanceof ArrayList) {
writeVector((ArrayList) obj);
} else if (obj instanceof List) {
writeList((List) obj);
} else if (obj instanceof Map) {
writeMap((Map) obj);
} else {
throw new RuntimeException("cannot write objects of this type");
}
}

Not sure why you're not getting the "cannot write objects of this
type" exception, but I'd expect things to work when you use
org.apache.hadoop.record.Buffer objects instead of byte[]s (or convert
the byte[]s to other proper objects first).

-Klaas

Klaas Bosteels

unread,
Jul 27, 2009, 8:39:56 AM7/27/09
to dumbo...@googlegroups.com
There seems to be a bug in the code that converts Buffers to typed
bytes by the way :/

http://issues.apache.org/jira/browse/MAPREDUCE-808

-Klaas

Tim Sell

unread,
Jul 27, 2009, 9:23:24 AM7/27/09
to dumbo...@googlegroups.com
ahh.. thanks.
I changed my mapper to just convert to string, and it still didn't
work. So I tried this:

@Override
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<TypedBytesWritable, TypedBytesWritable>
collector, Reporter reporter) throws IOException {

TypedBytesWritable row = new TypedBytesWritable();
row.setValue(new String("help"));

TypedBytesWritable tbColumns = new TypedBytesWritable();
tbColumns.setValue(new String("I'm stuck in a row factory"));

collector.collect(row, tbColumns);
}

And I still get invalid type byte 50.
I get the same if I don't do any collecting at all.

I've dumbo installed in a virtual environment.
here how I am running it:
$ ../python/env/bin/dumbo test.py -hadoop ~/hadoop-0.20.0.hbase/
-libjar build/dist/bobbie-mapred.jar -output /user/search/test
-inputformat org.apache.hadoop.hbase.mapred.TableInputFormat
-hadoopconf hbase.mapred.tablecolumns="fam1:qualifier1" -input
tablename

EXEC: HADOOP_CLASSPATH="build/dist/bobbie-mapred.jar:$HADOOP_CLASSPATH"
/home/search/hadoop-0.20.0.hbase//bin/hadoop jar
/home/search/hadoop-0.20.0.hbase//build/contrib/streaming/hadoop-0.20.1-dev-streaming.jar
-output '/user/search/test' -input 'db.catalogue' -cmdenv
'PYTHON_EGG_CACHE=/tmp/eggcache' -mapper 'python2.6 test.py map 0
367001600' -reducer 'python2.6 test.py red 0 367001600' -jobconf
'hbase.mapred.tablecolumns=artist:name' -file 'test.py' -jobconf
'stream.map.input=typedbytes' -jobconf
'stream.reduce.input=typedbytes' -jobconf
'stream.reduce.output=typedbytes' -jobconf
'stream.map.output=typedbytes' -jobconf
'mapred.job.name=test-/user/search/test (1/1)' -numReduceTasks '0'
-inputformat 'org.apache.hadoop.hbase.mapred.TableInputFormat'
-outputformat 'org.apache.hadoop.mapred.SequenceFileOutputFormat'
-file '/home/search/python/env/lib/python2.5/site-packages/dumbo-0.21.7-py2.5.egg'
-file '/home/search/python/env/lib/python2.5/site-packages/typedbytes-0.3.6-py2.5.egg'
-cmdenv 'PYTHONPATH=dumbo-0.21.7-py2.5.egg:typedbytes-0.3.6-py2.5.egg'
-file 'build/dist/bobbie-mapred.jar'

I just noticed, stream.map.input=typedbytes, is that as it should be?
It looks odd.

~Tim.


2009/7/27 Klaas Bosteels <klaas.b...@gmail.com>:

Klaas Bosteels

unread,
Jul 27, 2009, 10:00:25 AM7/27/09
to dumbo...@googlegroups.com
Oh, I actually thought you were talking about a custom input format.
Obviously I didn't look very closely at your code :)

It seems to me that the problem is

-mapper 'python2.6 test.py map 0 367001600'

which should really be something like

-mapper 'fm.last.bobbie.mapred.HBaseDumboMapper'

in your case. How are you specifying the Java mapper exactly?

-Klaas

Tim Sell

unread,
Jul 27, 2009, 10:04:24 AM7/27/09
to dumbo...@googlegroups.com
test.py looks like this:
<pre>
import dumbo

def runner(job):
mapper = "fm.last.bobbie.mapred.HBaseDumboMapper"
job.additer(mapper)

def starter(prog):
pass

if __name__ == "__main__":
dumbo.main(runner,starter)
</pre>

2009/7/27 Klaas Bosteels <klaas.b...@gmail.com>:

Klaas Bosteels

unread,
Jul 27, 2009, 10:17:54 AM7/27/09
to dumbo...@googlegroups.com
Weird, that should work. Are you sure you're using Dumbo 0.21.20 or higher?

It seems to work fine for me:

$ cat test.py
def runner(job):
mapper = "fm.last.feathers.map.Words"
job.additer(mapper)

def starter(prog):
pass

if __name__ == "__main__":
import dumbo
dumbo.main(runner, starter) # you could also just do dumbo.main(runner) btw
$ dumbo test.py -input brian.txt -output testoutput -libjar
feathers/feathers.jar -python python2.6 -hadoop /usr/lib/hadoop/
EXEC: HADOOP_CLASSPATH="/home/klbostee/feathers/feathers.jar:$HADOOP_CLASSPATH"
/usr/lib/hadoop//bin/hadoop jar
/usr/lib/hadoop//contrib/streaming/hadoop-0.18.3-4cloudera0.3.0-streaming.jar
-input 'brian.txt' -output 'testoutput' -mapper
'fm.last.feathers.map.Words' -reducer 'python2.6 test.py red 0
262144000' -jobconf 'stream.map.input=typedbytes' -jobconf
'stream.reduce.input=typedbytes' -jobconf
'stream.reduce.output=typedbytes' -jobconf
'stream.map.output=typedbytes' -jobconf 'mapred.job.name=test.py
(1/1)' -numReduceTasks '0' -inputformat
'org.apache.hadoop.streaming.AutoInputFormat' -outputformat
'org.apache.hadoop.mapred.SequenceFileOutputFormat' -cmdenv
'PYTHONPATH=dumbo-0.21.20-py2.5.egg:typedbytes-0.3.6-py2.5.egg' -file
'/home/klbostee/test.py' -file
'/usr/lib/python2.5/site-packages/dumbo-0.21.20-py2.5.egg' -file
'/usr/lib/python2.5/site-packages/typedbytes-0.3.6-py2.5.egg' -file
'/home/klbostee/feathers/feathers.jar'
[...]
09/07/27 13:50:11 INFO streaming.StreamJob: Job complete: job_200905301419_0123
09/07/27 13:50:11 INFO streaming.StreamJob: Output: testoutput
$ dumbo cat testoutput/part* -hadoop /usr/lib/hadoop/ | head -n 3
Yes 1
his 1
face 1

-Klaas

Tim Sell

unread,
Jul 27, 2009, 10:27:40 AM7/27/09
to dumbo...@googlegroups.com
ah crap hmmm, using 0.21.7
Not sure how that happened.
I'll upgrade and try again.


2009/7/27 Klaas Bosteels <klaas.b...@gmail.com>:

Tim Sell

unread,
Jul 27, 2009, 11:28:30 AM7/27/09
to dumbo...@googlegroups.com
Thanks for your help klaas, turned out it was just because I was using
an old version :P heh. so frustrating..
So below is my final class for outputting hbase columns to typedbytes
(as strings).
You will get typed bytes with
key = row
value = a dict { family : qualifier : value }

To run this do:
dumbo test.py -hadoop <hadoopdir> -libjar <jar with below mapper in
it> -inputformat org.apache.hadoop.hbase.mapred.TableInputFormat
-hadoopconf hbase.mapred.tablecolumns="family1:qualifier1
family2:qualifier2" -input <tablename> -output <outdir>

<pre>
package fm.last.bobbie.mapred;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.typedbytes.TypedBytesWritable;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;

/**
* Mapper class for fetching columns from hbase and returning typed
bytes for use with dumbo. Uses the depreciated
* mapred api, because hadoop streaming uses it.
*
* @author tims
*/
public class HBaseDumboMapper implements
Mapper<ImmutableBytesWritable, RowResult, TypedBytesWritable,
TypedBytesWritable> {

@Override
public void map(ImmutableBytesWritable key, RowResult value,
OutputCollector<TypedBytesWritable, TypedBytesWritable>
collector, Reporter reporter) throws IOException {

// family->qualifier->value
Map<String, Map<String, String>> columns = new HashMap<String,
Map<String, String>>();

for (Entry<byte[], Cell> entry : value.entrySet()) {
String cellValue = Bytes.toString(entry.getValue().getValue());
String columnStr = Bytes.toString(entry.getKey());
String[] familyQualifier = columnStr.split(":");
if (familyQualifier.length != 2) {
throw new IOException("cell column not in form
family:qualifier, got " + columnStr);
}
String family = familyQualifier[0];
String qualifier = familyQualifier[1];
Map<String, String> column = columns.get(family);
if (column == null) {
column = new HashMap<String, String>();
}
column.put(qualifier, cellValue);
columns.put(family, column);
}

TypedBytesWritable row = new TypedBytesWritable();
row.setValue(Bytes.toString(value.getRow()));

TypedBytesWritable tbColumns = new TypedBytesWritable();
tbColumns.setValue(columns);

collector.collect(row, tbColumns);
}

@Override
public void configure(JobConf job) {

}

@Override
public void close() throws IOException {

}
}

</pre>


2009/7/27 Tim Sell <trs...@gmail.com>:

Tim Sell

unread,
Jul 27, 2009, 11:32:24 AM7/27/09
to dumbo...@googlegroups.com
Talking with Mat Lehmann on #hbase, it would be nice to be able to
wrap a java mapper in a python mapper, then we could do things like
use a python mapper on the above HBaseDumboMapper seamlessly, imo this
would be much nicer then writing a custom inputformat for hbase that
emits typedbytes.

Can we do this already in dumbo? or shall I make an issue?

~Tim.

2009/7/27 Tim Sell <trs...@gmail.com>:

Tim Sell

unread,
Jul 27, 2009, 11:35:15 AM7/27/09
to dumbo...@googlegroups.com
oh oops, in the above example of how to use it, I meant to say, the
test.py file would look like this:

<pre>
import dumbo

def runner(job):
mapper = "fm.last.bobbie.mapred.HBaseDumboMapper"
job.additer(mapper, None)

def starter(prog):
pass

if __name__ == "__main__":
dumbo.main(runner,starter)
</pre>

2009/7/27 Tim Sell <trs...@gmail.com>:

Klaas Bosteels

unread,
Jul 27, 2009, 11:41:51 AM7/27/09
to dumbo...@googlegroups.com
On Mon, Jul 27, 2009 at 5:32 PM, Tim Sell<trs...@gmail.com> wrote:
>
> Talking with Mat Lehmann on #hbase, it would be nice to be able to
> wrap a java mapper in a python mapper, then we could do things like
> use a python mapper on the above HBaseDumboMapper seamlessly, imo this
> would be much nicer then writing a custom inputformat for hbase that
> emits typedbytes.
>
> Can we do this already in dumbo? or shall I make an issue?

I'm afraid not. Feel free to ticket, but this would probably require
changes to Streaming so it's probably not exactly an easy addition.

Why would this be better/nicer than just implementing an extension of
the hbase inputformat that outputs typed bytes tho?

-Klaas

Tim Sell

unread,
Jul 27, 2009, 11:57:36 AM7/27/09
to dumbo...@googlegroups.com
Just because it's easier to write a mapper, but if we have to change
streaming, it's probably easier to write a new input format.

2009/7/27 Klaas Bosteels <klaas.b...@gmail.com>:

Tim Sell

unread,
Jul 27, 2009, 3:30:15 PM7/27/09
to dumbo...@googlegroups.com
I wrote a custom input format, TypedBytesTableInputFormat.
http://pastebin.com/f42c5d626

I couldn't be bothered figuring out how to set the max number of
mappers from dumbo, so I just let it to be the number of regions in
the table you are scanning.

It outputs typed bytes as above. Still strings, since there's the
buffer bug and I'm using strings anyway.
It's quite nice to use, way better the using the mapper I posted earlier.
eg:
<pre>
#test2.py
import dumbo

def mapper(key, columns):
for family in columns:
for qualifier, value in columns[family].iteritems():
yield key, (family, qualifier, value)

def runner(job):
job.additer(mapper)

def starter(prog):
pass

if __name__ == "__main__":
dumbo.main(runner,starter)
</pre>


I should probably stick this on github or something, and eventually
add to hbase.
Now we just need a TypedBytesTableOutputFormat :)

~Tim.


2009/7/27 Tim Sell <trs...@gmail.com>:

Klaas Bosteels

unread,
Jul 28, 2009, 2:40:13 AM7/28/09
to dumbo...@googlegroups.com
Awesome! Slap it on Github (or submit it to HBase) and I'll blog about it... :)

-Klaas

Mat Lehmann

unread,
Jul 28, 2009, 4:24:43 AM7/28/09
to dumbo-user
Kudos and many thanks for the TypedBytesInputFormat. Was working on a
similar thing, but you were simply faster.

Tim Sell

unread,
Jul 28, 2009, 12:05:19 PM7/28/09
to dumbo...@googlegroups.com
Reply all
Reply to author
Forward
0 new messages