Cascading Stream Comparing/CoGroup Question

179 views
Skip to first unread message

Thomas Scott

unread,
Jun 11, 2012, 5:15:15 PM6/11/12
to cascading-user
Hello, I'm rather new to Cascading 2.0 so I've been doing a lot of
trial-error stuff but I have spent the last couple of hours reading
the documentation and I think I'm a little stuck. So essentially what
I want to do is check to see if one field of a tuple is the same as a
field in another tuple from another stream. Is there a way that can be
done? I thought about CoGroup but I couldn't figure out the syntax to
do it.

Because both of my streams come from the same input tap, my immediate
solution was the following:

public void operate(FlowProcess flowProcess, FunctionCall
functionCall) {
TupleEntry tupleEntry = functionCall.getArguments();
TupleEntryCollector collector =
functionCall.getOutputCollector();
List list = null;
if (isRdfType(tupleEntry)) {
if (!list.contains(tupleEntry.getString("object"))) {
list.add(tupleEntry.getString("object"));
}
}

if (list.contains(tupleEntry.getString("object"))) {
collector.add(tupleEntry);
}
}

However that causes the reducing task to fail with the following
unhelpful message:


12/06/11 16:04:53 INFO util.HadoopUtil: resolving application jar from
found main method on: Main
12/06/11 16:04:53 INFO planner.HadoopPlanner: using application jar: /
Users/Thomas/Documents/Java/mvnPort/1/target/cascadingSort-1.0-
SNAPSHOT-jar-with-dependencies.jar
12/06/11 16:04:53 INFO property.AppProps: using app.id:
708245521DACB4AC33166BA324116BF6
12/06/11 16:04:54 INFO util.Version: Concurrent, Inc - Cascading 2.0.0
12/06/11 16:04:54 INFO cascade.Cascade: [Sort] starting
12/06/11 16:04:54 INFO cascade.Cascade: [Sort] parallel execution is
enabled: true
12/06/11 16:04:54 INFO cascade.Cascade: [Sort] starting flows: 1
12/06/11 16:04:54 INFO cascade.Cascade: [Sort] allocating threads: 1
12/06/11 16:04:54 INFO cascade.Cascade: [Sort] starting flow: Sort
12/06/11 16:04:54 INFO flow.Flow: [Sort] at least one sink is marked
for delete
12/06/11 16:04:54 INFO flow.Flow: [Sort] sink oldest modified date:
Wed Dec 31 17:59:59 CST 1969
12/06/11 16:04:54 INFO flow.Flow: [Sort] starting
12/06/11 16:04:54 INFO flow.Flow: [Sort] source:
Dfs["TextLine[['line']->[ALL]]"]["/firstProject/input"]"]
12/06/11 16:04:54 INFO flow.Flow: [Sort] sink:
Dfs["TextDelimited[['predicate', 'object']]"]["/firstProject/
outputObjects"]"]
12/06/11 16:04:54 INFO flow.Flow: [Sort] parallel execution is
enabled: true
12/06/11 16:04:54 INFO flow.Flow: [Sort] starting jobs: 2
12/06/11 16:04:54 INFO flow.Flow: [Sort] allocating threads: 2
12/06/11 16:04:54 INFO flow.FlowStep: [Sort] starting step: (1/2)
12/06/11 16:04:54 WARN util.NativeCodeLoader: Unable to load native-
hadoop library for your platform... using builtin-java classes where
applicable
12/06/11 16:04:54 WARN snappy.LoadSnappy: Snappy native library not
loaded
12/06/11 16:04:54 INFO mapred.FileInputFormat: Total input paths to
process : 1
12/06/11 16:04:54 INFO flow.FlowStep: [Sort] submitted hadoop job:
job_201206111602_0002
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] task completion events
identify failed tasks
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] task completion events
count: 9
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_m_000004_0, Status : SUCCEEDED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_m_000000_0, Status : SUCCEEDED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_m_000001_0, Status : SUCCEEDED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_m_000002_0, Status : SUCCEEDED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_r_000000_0, Status : FAILED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_r_000000_1, Status : FAILED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_r_000000_2, Status : FAILED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_r_000000_3, Status : TIPFAILED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] event = Task Id :
attempt_201206111602_0002_m_000003_0, Status : SUCCEEDED
12/06/11 16:06:00 WARN flow.FlowStep: [Sort] abandoning step:
(2/2) ...irstProject/outputObjects, predecessor failed: (1/2)
12/06/11 16:06:00 INFO flow.FlowStep: [Sort] stopping:
(2/2) ...irstProject/outputObjects
12/06/11 16:06:00 INFO flow.Flow: [Sort] stopping all jobs
12/06/11 16:06:00 INFO flow.FlowStep: [Sort] stopping:
(2/2) ...irstProject/outputObjects
12/06/11 16:06:00 INFO flow.FlowStep: [Sort] stopping: (1/2)
12/06/11 16:06:00 INFO flow.Flow: [Sort] stopped all jobs
12/06/11 16:06:00 WARN cascade.Cascade: [Sort] flow failed: Sort
cascading.flow.FlowException: step failed: (1/2), with job id:
job_201206111602_0002, please see cluster logs for failure messages
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:
193)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:137)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:122)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
12/06/11 16:06:00 INFO cascade.Cascade: [Sort] stopping all flows
12/06/11 16:06:00 INFO cascade.Cascade: [Sort] stopping flow: Sort
12/06/11 16:06:00 INFO flow.Flow: [Sort] stopping all jobs
12/06/11 16:06:00 INFO flow.FlowStep: [Sort] stopping:
(2/2) ...irstProject/outputObjects
12/06/11 16:06:00 INFO flow.FlowStep: [Sort] stopping: (1/2)
12/06/11 16:06:00 INFO flow.Flow: [Sort] stopped all jobs
12/06/11 16:06:00 INFO cascade.Cascade: [Sort] stopped all flows
Exception in thread "main" cascading.cascade.CascadeException: flow
failed: Sort
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:771)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:710)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:680)
Caused by: cascading.flow.FlowException: step failed: (1/2), with job
id: job_201206111602_0002, please see cluster logs for failure
messages
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:
193)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:137)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:122)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
... 5 more

Any and all help is appreciated.

Ken Krugler

unread,
Jun 11, 2012, 6:57:46 PM6/11/12
to cascadi...@googlegroups.com
Hi Thomas,

Are you running this on a real cluster? Normally if you run in local mode, you'll get the root cause of the failure earlier in the console log output, before you get to the "step failed... please see cluster logs for failure messages" output.

Also I'm not sure what you mean by "both of my streams come from the same input tap". 

That sounds like you've got one dataset, and you're trying to find tuples where the value of field X is the same as the value of field Y.

If that's true, I could post some code that works in 1.2 (haven't tried in 2.0)

-- Ken
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.


--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr




Thomas Scott

unread,
Jun 12, 2012, 9:59:01 AM6/12/12
to cascadi...@googlegroups.com
I'm running in local mode using the Cloudera distribution of Hadoop.  You are correct in that I have one dataset but I want to find tuples where field X in one tuple is the same as field X in another tuple.  I would be very much appreciative if you could post some code that works in 1.2 because not much in terms of syntax has changed to 2.0.   

Thanks,
-Thomas
To post to this group, send email to cascading-user@googlegroups.com.
To unsubscribe from this group, send email to cascading-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
Jun 12, 2012, 3:51:14 PM6/12/12
to cascadi...@googlegroups.com
Hi Thomas,

On Jun 12, 2012, at 6:59am, Thomas Scott wrote:

I'm running in local mode using the Cloudera distribution of Hadoop.  You are correct in that I have one dataset but I want to find tuples where field X in one tuple is the same as field X in another tuple.  

If you're looking at a match between the same field in tuples, then it's easy.

Pipe joined = new GroupBy(incomingPipe, new Fields("field X"));
joined = new Every(joined, new MyBufferThatProcessesDuplicateFieldValues(), Fields.RESULTS);

The custom Buffer would implement whatever logic you need to handle groups with more than one member (indicating duplicate values).

Where it gets a little harder is if in you want join using two different fields. At least in 1.2 you have to work around an issue where a CoGroup doesn't let you do that type of join on the same incoming pipe.

-- Ken

To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/dYcUzExYDwgJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Thomas Scott

unread,
Jun 13, 2012, 10:13:20 AM6/13/12
to cascadi...@googlegroups.com
Thank you for your assistance Ken, it is very much appreciated. I'll try some custom buffers and see if I can get this working.

Thanks again,
-Thomas

Thomas Scott

unread,
Jun 15, 2012, 10:19:49 AM6/15/12
to cascadi...@googlegroups.com
Alright so I have been bashing my brain against this problem for a couple of days and I still cannot seem to get the results I want. If I could again receive some of your insight on this I would appreciate it. I think I have diagnosed the problem but I don't know how to fix it. My iterator appears to only iterate through one TupleEntry, the same one that is received as the group. I have literally bruised my brain thinking this through so any help you would give me is appreciated. My buffer code and flow are as follows:


public class PredicateBuffer extends BaseOperation implements Buffer {

    

    public PredicateBuffer() {

        super(new Fields("subject","predicate", "object"));

    }

    

    public void operate(FlowProcess flowProcess, BufferCall bufferCall) {

        TupleEntry group = bufferCall.getGroup();

        Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();

        while (arguments.hasNext()) {

            TupleEntry argument = arguments.next();

            

            ////////////////////////

            //Debugging block, apparently arguments has only one element and hasNext() returns false after the first run.

            int i=0;

            String bloc = "" + i;

            bufferCall.getOutputCollector().add(new Tuple(bloc,argument.getString("predicate"),argument.getString("object")));

            i = i+1;

            ////////////////////////

            

            

            if (group.getString("predicate").equalsIgnoreCase("http://www.w3.org/1999/02/22-rdf-syntax-ns#type")) {

                if (isDesirable(argument,group)) {

                    bufferCall.getOutputCollector().add(new Tuple(argument.getString("subject"),argument.getString("predicate"),argument.getString("object")));

                }

            }

        }

    }

    protected boolean isDesirable(TupleEntry argument, TupleEntry group) {

            String argObjStr = argument.getString("object");

            String grpObjStr = group.getString("object");

            return grpObjStr.equalsIgnoreCase(argObjStr);

    }

}

Pipe assembly = new Pipe("Sort");

assembly = new Each(assembly, new Fields("line"), new LineSplitter()); //Splits up lines into Tuple Entries with ("subject","predicate","object") fields.

assembly = new Unique(assembly, new Fields("predicate","object"));

assembly = new GroupBy(assembly,new Fields("subject","predicate","object"), new Fields("predicate","object"));

assembly = new Every(assembly, new Fields("subject","predicate","object"), new PredicateBuffer(),Fields.REPLACE);

Ken Krugler

unread,
Jun 15, 2012, 10:36:59 AM6/15/12
to cascadi...@googlegroups.com
Hi Thomas,

On Jun 15, 2012, at 7:19am, Thomas Scott wrote:

Alright so I have been bashing my brain against this problem for a couple of days and I still cannot seem to get the results I want. If I could again receive some of your insight on this I would appreciate it. I think I have diagnosed the problem but I don't know how to fix it.

Without the input data and the desired result, it's hard to figure out what you're trying to do, so comments below are just guesses.

My iterator appears to only iterate through one TupleEntry, the same one that is received as the group.

This line in your flow:

assembly = new GroupBy(assembly,new Fields("subject","predicate","object"), new Fields("predicate","object"));

Means that you'll get one group for each unique combination of subject, predict, and object.

Since those are all the fields you have, you'll get one tuple in your buffer unless you have duplicates.

I'd guess that you wan to group on subject only.

But you previously did a:

Unique(assembly, new Fields("predicate","object"));

So that makes me think I don't know what you're trying do, thus my initial comment.

-- Ken


To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/ZJ1bUKbO7toJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
Message has been deleted

Thomas Scott

unread,
Jun 15, 2012, 12:53:48 PM6/15/12
to cascadi...@googlegroups.com
Ken,

My apologies for not having explained very well in the first place. What I think I am trying to do is this: I have a blob of triples each with <"subject","predicate","object">.  I want to find all the triples that have the predicate "type".  Then I want to take the objects associated with those triples that have the predicate "type" and find all the other predicates associated with them.

 My thinking was that I would find all the unique predicate-object combinations to reduce the workload for the rest of the pipe. Then I would apply the buffer to the stream in order to get the desired result. Unfortunately I couldn't just put an Every pipe after an Unique pipe so I stuck a GroupBy in there and configured it so that it maintained the stream from my Unique. I guess that's where stuff got messed up. 

-Thomas

Ken Krugler

unread,
Jun 16, 2012, 9:53:54 AM6/16/12
to cascadi...@googlegroups.com
On Jun 15, 2012, at 9:53am, Thomas Scott wrote:

Ken,

My apologies for not having explained very well in the first place. What I think I am trying to do is this: I have a blob of triples each with <"subject","predicate","object">.  I want to find all the triples that have the predicate "type".  Then I want to take the objects associated with those triples that have the predicate "type" and find all the other predicates associated with them.

OK, so you want to find the set of object values with predicate:xxx, and then find all of the predicates that have those same objects, right?

Danger, untested code typed pre-espresso:

Pipe p = <pipe with every tuple>

// Split the pipe
Pipe targetObjects = new Pipe("target objects", p);

// Filter out anything that has a predicate value we don't want.
targetObjects = new Each(targetObjects, new Fields("predicate"), new ExpressionFilter("$0 != "type"));

// Leave only the object field, and rename it
targetObjects = new Each(targetObjects, new Fields("object"), new Identity(new Fields("target-object"));

// Only leave unique values.
targetObjects = new Unique(targetObjects, new Fields("target-object");

// Now group with the original stream.
p = new CoGroup(p, new Fields("object"), targetObjects, new Fields("target-object"), null, new RightJoin());

// At this point you should have a join of only the left side (full tuples) that have object values that match your target object values.

-- Ken


 My thinking was that I would find all the unique predicate-object combinations to reduce the workload for the rest of the pipe. Then I would apply the buffer to the stream in order to get the desired result. Unfortunately I couldn't just put an Every pipe after an Unique pipe so I stuck a GroupBy in there and configured it so that it maintained the stream from my Unique. I guess that's where stuff got messed up. 

-Thomas

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/d9Q11lXEgkYJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr

Thomas Scott

unread,
Jun 18, 2012, 2:57:30 PM6/18/12
to cascadi...@googlegroups.com
Thank you so very much for your help Ken, that code snip was close enough to what I wanted that I got there.  Just a minor modification to some fields and a filter constructor that works in 2.0. Thanks again!

All the best,
-Thomas
Reply all
Reply to author
Forward
0 new messages