Bind multiple sources to a flow problem

72 views
Skip to first unread message

Guillaume

unread,
Feb 10, 2011, 5:06:38 AM2/10/11
to cascading-user
Hello,

I am trying to use cascading to simplify and chain MapReduce jobs I
have wrote.
The resulting flow will retrieve data in multiple source, and join
them in order to create one big object.

My flow looks just like the example about multiple source in a flow
from the user guide.
The only thing is I use a custom Tap in order to read my data, this
Tap has been wrote using a custon InputFormat I have realize for my
hadoop jobs.

The problem I get is that it appears my different head Pipes are using
the same source Tap, and I don't understand why. The result is that I
have a class cast exception in an operation, because the source object
of this operation is not what I expected. I've try to debug the flow
process, but I did'nt understand where it goes wrong, except it is
building a FlowStep whith a unique source Tap, and applying all my
operation in this FlowStep with this unique source.

Here is an simplified overview of my Flow code (that I wrote for
testing, it is not writing the proper thing) :

//Workflow sources
Tap matchSource = new FileListTap(DataSourceInterface.MATCH_PROP);
Tap newSourceSource = new
FileListTap(DataSourceInterface.NEW_SOURCE_PROP);

//sink
Scheme sinkScheme = new WritableSequenceFile(new
Fields("transitId","SourceObject"), LongWritable.class,
GaiaWritable.class);
Tap sink = new Hfs( sinkScheme, "Ingestion", SinkMode.REPLACE );

//multiple heads
Pipe matchPipe = new Pipe( "MatchPipe" );
Pipe newSourcePipe = new Pipe( "NewSourcePipe" );

Pipe[] headPipes = Pipe.pipes(matchPipe, newSourcePipe);
Tap[] sourceTabs = Tap.taps(matchSource, newSourceSource);

Map<String, Tap> sources = Cascades.tapsMap(headPipes, sourceTabs);

//Match filtering
matchPipe = new Each(matchPipe, new MatchFilter());

//Tuple transforming
matchPipe = new Each(matchPipe, new MatchTupleTransformer());
newSourcePipe = new Each(newSourcePipe, new
NewSourceTupleTransformer());

//Join on sourceId
Fields common = new Fields("sourceId");
Fields declared = new Fields("transitId", "sourceId1",
"MatchObject", "sourceId2", "SourceObject");
Pipe joinMatchNewSource = new CoGroup( matchPipe, common,
newSourcePipe, common, declared, new InnerJoin() );

//Flow connection and execution
// initialize app properties, tell Hadoop which jar file to use
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties,
MatchAndNewSourceCU4Ingestion.class );

// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "Ingestion", sources, sink,
joinMatchNewSource );

flow.writeDOT("./MyFlowMatchAndNew");
// execute the flow, block until complete
flow.complete();

The exception I get :
Caused by: java.lang.ClassCastException:
job.datainput.NewSourcePrimaryKey cannot be cast to
job.datainput.MatchPrimaryKey
at
job.hadoop.cascading.operation.MatchFilter.isRemove(MatchFilter.java:
56)
at cascading.pipe.Each.applyFilter(Each.java:372)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
... 8 more

Does anyone has any idea abuot what I'm doing wrong ?

Guillaume

unread,
Feb 10, 2011, 5:37:14 AM2/10/11
to cascading-user
I think I get a solution to my first problem, my custom tap had to
define the equals method.
I get this by seeing that in FlowStep, Sources are stored in a
Map<Tap, String>. My tap was always replaced because the default
implementation of equals in Tap is very permissive.

Ken Krugler

unread,
Feb 10, 2011, 8:34:59 AM2/10/11
to cascadi...@googlegroups.com
Two comments inline below.

-- Ken

On Feb 10, 2011, at 2:37am, Guillaume wrote:

> I think I get a solution to my first problem, my custom tap had to
> define the equals method.
> I get this by seeing that in FlowStep, Sources are stored in a
> Map<Tap, String>. My tap was always replaced because the default
> implementation of equals in Tap is very permissive.
>
> On 10 fév, 11:06, Guillaume <g.eynard.bonte...@gmail.com> wrote:
>> Hello,
>>
>> I am trying to use cascading to simplify and chain MapReduce jobs I
>> have wrote.
>> The resulting flow will retrieve data in multiple source, and join
>> them in order to create one big object.
>>
>> My flow looks just like the example about multiple source in a flow
>> from the user guide.
>> The only thing is I use a custom Tap in order to read my data, this
>> Tap has been wrote using a custon InputFormat I have realize for my
>> hadoop jobs.

Normally you'd just create a custom Scheme. I assume your input data
comes from HDFS, so then the tap is Hfs.

That would simplify what you have to implement.

What does this code look like? I assume this is your custom code, right?

>> at cascading.pipe.Each.applyFilter(Each.java:372)
>> at cascading.pipe.Each.access$300(Each.java:53)
>> at cascading.pipe.Each$EachFilterHandler.handle(Each.java:
>> 558)
>> at cascading.pipe.Each$EachHandler.operate(Each.java:478)
>> ... 8 more
>>
>> Does anyone has any idea abuot what I'm doing wrong ?
>

> --
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> .
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en
> .
>

--------------------------
Ken Krugler
+1 530-210-6378
http://bixolabs.com
e l a s t i c w e b m i n i n g

Guillaume

unread,
Feb 10, 2011, 8:52:58 AM2/10/11
to cascading-user
Actually, my input data does not come from HDFS, it is actually coming
from a diskspace shared between all the clusters used by hadoop. but
it is probably true that I haven't quite understood the custom Tap and
Scheme. I could maybe use an already existing Tap. But anyway, this
problem has been solve by my new implementation of Tap.equals method.

For the second comment, yes it's part of custom code two, it cast my
readed objects, and do some filter on it :

TupleEntry arguments = filterCall.getArguments();
if (arguments.size() == 1){
gWrite = (MyWritable<MatchPrimaryKey>) arguments.getObject(0);
} else {
gWrite = (MyWritable<MatchPrimaryKey>) arguments.getObject(1);
}

//On garde si le flag est 2
if (gWrite.getObjectValue().getGaiaValue().getFlags() == 2) {
System.out.println("Mapper->Match with flag == 2, transitId : " +
gWrite.getObjectValue().getGaiaValue().getTransitId());
return false;
}


Now I've got another problem, but I'm not sure right now from where it
comes. If I just apply my filter to what I red, and then store it into
a sequence file, it works. But I'm trying to apply another operation
which transform the incoming field into other fields, and then storing
part of these fields into a sequence file. Here it how my custom
operation looks like :

public class MatchTupleTransformer extends BaseOperation implements
Function
{

public MatchTupleTransformer()
{

super(new Fields("transitId", "sourceId", "MatchObject"));
}

public void operate( FlowProcess flowProcess, FunctionCall
functionCall )
{
// get the arguments TupleEntry
TupleEntry arguments = functionCall.getArguments();

// create a Tuple to hold our result values
Tuple result = new Tuple();

//On cherche un Writable
MyWritable<MatchPrimaryKey> gWrite;
if (arguments.size() == 1){
gWrite = (MyWritable<MatchPrimaryKey>) arguments.getObject(0);
} else {
gWrite = (MyWritable<MatchPrimaryKey>) arguments.getObject(1);
}
System.out.println("Match Transforming " + arguments.getObject(0) +
", "+ gWrite.getGaiaValue());

// insert some values into the result Tuple
result.add(gWrite.getObjectValue().getGaiaValue().getTransitId());//
transitId is a long
result.add(gWrite.getObjectValue().getGaiaValue().getSourceId());//
sourceId is a long
result.add(gWrite);

// return the result Tuple
functionCall.getOutputCollector().add( result );
}
}

This function is applied because I need the sourceId and transitId
long in order to do a join later. Maybe it's because transitId and
sourceId are not of type Writable ? I don't know yet, I'm still
debugging.

Thank you for your answer !
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
> > .
>
> --------------------------
> Ken Krugler
> +1 530-210-6378http://bixolabs.com

Guillaume

unread,
Feb 11, 2011, 4:25:17 AM2/11/11
to cascading-user
My next problem was solve by making my equals method a little more
permissive...

By defining an equals method in my scheme, cascading was able to
differentiate my taps. But the problem is, there must be multiple
instantiations of each tap during the flow process, because my equals
method was failing... Anyway, by testing only the crucial attributes
of my scheme, it solves this problem.
Reply all
Reply to author
Forward
0 new messages