Puzzled by redundant output with multiple runs

108 views
Skip to first unread message

piccolbo

unread,
Sep 3, 2010, 3:54:14 PM9/3/10
to dumbo-user
I have this little test program. I wanted to learn if can pass
arguments to __init__ of a mapper class.

import dumbo
import sys

class TestMapper:
def __init__(self, _val):
sys.stderr.write("Executing __init__" + str(_val) + "\n")
self.val = _val
def __call__(self, key, val):
yield key, self.val #using init val, not MR input val

if __name__ == '__main__':
sys.stderr.write("before run 1\n")
dumbo.run(TestMapper(1), opts = [("output", "test1")])
sys.stderr.write("in between runs\n")
dumbo.run(TestMapper(2), opts = [("output", "test2")])
sys.stderr.write("after run 2\n")

This is the console output. Why on earth does "before run 1" print
twice, before and after run 1? Also, "in between runs" prints twice
and so does "after run 2". This is puzzling. As to the original
question, the stderr prints might not be totally clear, but the mapper
objects get instantiated with 1, but never with 2, or if it does it's
never used. Can somebody please enlighten me? Thanks

Antonio

rl1:~/dumbo/dc3$ dumbo start testinit.py -python python -input
mississippi.txt
before run 1
Executing __init__1
EXEC: PYTHONPATH="/usr/local/lib/python2.6/dist-packages/dumbo-0.21.26-
py2.6.egg:$PYTHONPATH" python -m dumbo.cmd encodepipe -file
mississippi.txt | PYTHONPATH="/usr/local/lib/python2.6/dist-packages/
dumbo-0.21.26-py2.6.egg:$PYTHONPATH"
dumbo_mrbase_class='dumbo.backends.common.MapRedBase'
dumbo_jk_class='dumbo.backends.common.JoinKey' python -m testinit map
0 262144000 > 'test1'
before run 1
Executing __init__1
in between runs
Executing __init__2
after run 2
in between runs
Executing __init__2
EXEC: PYTHONPATH="/usr/local/lib/python2.6/dist-packages/dumbo-0.21.26-
py2.6.egg:$PYTHONPATH" python -m dumbo.cmd encodepipe -file
mississippi.txt | PYTHONPATH="/usr/local/lib/python2.6/dist-packages/
dumbo-0.21.26-py2.6.egg:$PYTHONPATH"
dumbo_mrbase_class='dumbo.backends.common.MapRedBase'
dumbo_jk_class='dumbo.backends.common.JoinKey' python -m testinit map
0 262144000 > 'test2'
before run 1
Executing __init__1
in between runs
Executing __init__2
after run 2
after run 2




Klaas Bosteels

unread,
Sep 7, 2010, 5:09:50 AM9/7/10
to dumbo...@googlegroups.com
Hey Antonio,

You should only ever have one dumbo.run() call and it's normal for
dumbo scripts to kind of get executed twice yeah (there are reasons
for this but you shouldn't have to worry about those as a dumbo user).

Instead, you should use a runner if you want to have multiple
map/reduce iterations:

def runner(job):
job.additer(TestMapper(1), opts = [("output", "test1")])
job.additer(TestMapper(2), opts = [("output", "test2")])

class TestMapper:
def __init__(self, _val):
sys.stderr.write("Executing __init__" + str(_val) + "\n")
self.val = _val
def __call__(self, key, val):
yield key, self.val #using init val, not MR input val

if __name__ == "__main__":
dumbo.main(runner)


Hope this helps,
-Klaas

> --
> You received this message because you are subscribed to the Google Groups "dumbo-user" group.
> To post to this group, send email to dumbo...@googlegroups.com.
> To unsubscribe from this group, send email to dumbo-user+...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/dumbo-user?hl=en.
>
>

piccolbo

unread,
Sep 7, 2010, 7:43:27 PM9/7/10
to dumbo-user
Thanks Klaas, that's clear now but not a solution for my use case. In
fact I was trying to use multiple dumbo.run() calls for a deeper
reason besides this test of __init__, let me try to explain. The
second iteration can and should be performed only based on some
property of the results of the first. Based on your example, something
like this:

def runner(job):
  job.additer(TestMapper(1), opts = [("output", "test1")])
if some_predicate("test1"):
   job.additer(TestMapper(2), opts = [("output", "test2")])

This doesn't work though because additer does not write anything into
test1 before it returns, and some_predicate can not be evaluated. What
I need is a dumbo.run_it_now_not_later() or the equivalent of runJob
in hadoop for java so that I can examine the output of an iteration
and make decisions -- that is what the dumbo command does at the shell
level, but with a python API. This is not an odd use case. There are
many important algorithms that can be implemented with a variable
number of MR iterations but there is no known solution with a fixed
number (of course one can always do everything in one reducer, but
that's not parallel or scalable). For instance, if you want to find
the connected components of a large graph, both the algorithm in Jimmy
Lin's book and the one I wrote up in my blog (blog.piccolboni.info)
require a variable number of iterations. If somebody has one with a
constant number, I am not aware of it. The alternatives I can think of
are calling dumbo multiple times, which I can do only going to the
shell script level or using popen in python, or writing everything in
java, neither of which is very appealing to me. Any thoughts? Thanks


Antonio

Klaas Bosteels

unread,
Sep 8, 2010, 5:59:00 AM9/8/10
to dumbo...@googlegroups.com
You might be able to use a variator for this:

def variator(prog):
i = 0
while some_predicate:
clone = prog.clone()
clone.addopt("param", "iteration=" + str(i))
clone.addopt("input", nr_to_input(i))
clone.addopt("output", nr_to_output(i))
yield clone
i++

def starter(prog):
pass # or set general opts or so

def runner(job):
job.additer(Mapper)

Please blog about it if it works, since this is completely
undocumented functionality as far as I know :)

-K

Klaas Bosteels

unread,
Sep 8, 2010, 7:26:01 AM9/8/10
to dumbo...@googlegroups.com
I probably should've mentioned that to use the variator you then have to do:

dumbo.main(runner, starter, variator)

-K

Reply all
Reply to author
Forward
0 new messages