MultipleOutputs support

97 views
Skip to first unread message

dmi...@tellapart.com

unread,
Nov 17, 2009, 12:46:54 AM11/17/09
to cascading-user
Hi folks,

I'm wondering if there's any support for MultipleOutputs from a
reducer in cascading code (or plan to add any.) My scenario is as
follows:

1. I have a pipe that does a GroupBy followed by an Every on a bunch
of data.
2. After this point, I have a set of tuples with Fields foo and bar.
I'd like to split this up into two different outputs, based on the
value of foo (so, if foo=null, output it to output_null, for all other
values output it to output_all).

I could accomplish this in Cascading by creating an additional two
pipes, each of which will run Each with a Filter to eliminate the
unwanted values of foo. But as far as I can tell, in order to output
each of these pipes to a different location, I'd need another GroupBy
here to rename each of these pipes to a different groupName, so that
this groupName can be associated with another sink for the data.
Something like this:

pipe = new GroupBy(pipe, ...);
pipe = new Every(pipe, new SomeBuffer(), Fields.RESULTS);

nullPipe = new Each(pipe, new FilterNull());
otherPipe = new Each(pipe, new FilterNotNull());

nullPipe = new GroupBy("output_null", nullPipe, ...);
otherPipe = new GroupBy("output_other", otherPipe, ...);

So I wind up having two map-reduce steps in this flow.

I could accomplish the same in Hadoop MapReduce world with just one
map-reduce step, by using MultipleOutputs as follows:

public void reduce(...) {
...
if (foo == null) {
collector = multipleOutputs.getCollector("output_null");
} else {
collector = multipleOutputs.getCollector("output_all");
}

collector.collect(...)
}

Is there any way we could do this in cascading? Or, is there some
other way of structuring my flow to avoid having two map-reduce steps?

Thanks,

- Dmitry

Chris Curtin

unread,
Nov 17, 2009, 9:40:09 AM11/17/09
to cascading-user
Hi,

Won't a TemplateTap work? http://www.cascading.org/userguide/htmlsingle/#N20B60

Chris

On Nov 17, 12:46 am, "dmi...@tellapart.com" <dmi...@tellapart.com>
wrote:

Ken Krugler

unread,
Nov 17, 2009, 12:15:13 PM11/17/09
to cascadi...@googlegroups.com
Hi Dmitry,

Funny, I was asking a similar question on the list last week - basically how to efficiently split tuples in a pipe into two sets.

Since you want to write the sets out, I think the most efficient approach would be to modify the new MultiSinkTap (in WIP 1.1) to also take a split function.

Then in the MultiSinkCollector.collect() method you'd use this to decide which of the taps to use.

But in any case you don't need another GroupBy in your example below. This should be sufficient:

pipe = new GroupBy(pipe, ...);
pipe = new Every(pipe, new SomeBuffer(), Fields.RESULTS);

Pipe nullPipe = new Pipe(pipe, "output_null");
nullPipe = new Each(nullPipe, new FilterNull());

Pipe otherPipe = new Pipe(pipe, "output_other");
otherPipe = new Each(otherPipe, new FilterNotNull());

The act of creating a new pipe from your original pipe splits it.

At the end is a Cascading SubAssembly I use in Bixo as a convenience wrapper for splitting pipes in this manner. The ISplitter interface is just:

public interface ISplitter {
public boolean isLHS(Tuple tuple);
}

-- Ken
public class SplitterAssembly extends SubAssembly {
private static final String LHS_SUFFIX = "-lhs";
private static final String RHS_SUFFIX = "-rhs";


private String _baseName;


private static class SplitterFilter extends BaseOperation<NullContext> implements Filter<NullContext> {
private ISplitter _splitter;
private boolean _wantLHS;


public SplitterFilter(ISplitter splitter, boolean wantLHS) {
_splitter = splitter;
_wantLHS = wantLHS;
}


@Override
public boolean isRemove(FlowProcess flowProcess, FilterCall<NullContext> filterCall) {
return _splitter.isLHS(filterCall.getArguments().getTuple()) != _wantLHS;
}
}


public SplitterAssembly(Pipe inputPipe, ISplitter splitter) {
_baseName = inputPipe.getName();
        Pipe lhsPipe = new Pipe(_baseName + LHS_SUFFIX, inputPipe);
        lhsPipe = new Each(lhsPipe, new SplitterFilter(splitter, true));

        

        Pipe rhsPipe = new Pipe(_baseName + RHS_SUFFIX, inputPipe);
        rhsPipe = new Each(rhsPipe, new SplitterFilter(splitter, false));

        setTails(lhsPipe, rhsPipe);
}


public Pipe getLHSPipe() {
    return getTailPipe(_baseName + LHS_SUFFIX);
}

public Pipe getRHSPipe() {
    return getTailPipe(_baseName + RHS_SUFFIX);
}

    private Pipe getTailPipe(String pipeName) {
        String[] pipeNames = getTailNames();
        for (int i = 0; i < pipeNames.length; i++) {
            if (pipeName.equals(pipeNames[i])) {
                return getTails()[i];
            }
        }

        

        throw new InvalidParameterException("Invalid pipe name: " + pipeName);
    }

}



--------------------------------------------
Ken Krugler
e l a s t i c   w e b   m i n i n g




dmi...@tellapart.com

unread,
Nov 17, 2009, 4:42:59 PM11/17/09
to cascading-user
Thanks all.

Ken, thanks for the renaming trick - I hadn't thought of that. As I
suspected this results in an extra MR step but at this point that step
is fairly cheap so I'm willing to let it slide. Modifying the
MultiSinkTap as you suggest sounds like the right approach for
efficiency though.

Chris, I think the TemplateTap would work if I wanted to have one
output for each value of foo -- but unfortunately in my case I have
many possible values of foo and I'd like to group some of those values
into one output, others into another, and yet another set of values
into a third. I don't think TemplateTap is quite that flexible yet.

- Dmitry
> +1 530-210-6378http://bixolabs.com
Reply all
Reply to author
Forward
0 new messages