Can Cascading help me with my use case?

85 views
Skip to first unread message

Martijn van Leeuwen

unread,
May 23, 2013, 6:14:15 AM5/23/13
to cascadi...@googlegroups.com
Hi All,

I am new to Cascading, so please be gentle...

I am currently working on a project which involves populating a graph database. Below is some background informatie about my project.

I am trying to create a graph  the shows a relationship between entities (Person, Location, Organisations) and the document(s) they occur. I use the Stanford NER to
extract the entities from the documents. I use a Cascading job to do this, this part works fine. the output is as follows, which is TextDelimited where "Tab" is the delimiter.

Entities
doc_id    name    type    offset    length
1            John     Person     100    4
1            Joe Smith    Person     110    8
1            Ellis     Person     230    5
2            John     Person     100    4
2            Joe Smith     Person     435    8 

My second job would be to create both nodes and edges from this source file. The output would be something like this.

Nodes
node_id    name    type
1              1          Document
2              2          Document
3              John     Person
4              Joe Smith Person
5              Ellis      Person

This particular part of the job basically performs a Distinct on both the name and doc_id columns to create the nodes list. That Cascading has something
like a Distinct function?

Edges
edge_id    source    target    relationship
1             1             John     contains
2             1             Joe Smith    contains
3             1             Ellis    contains
4             2             John   contains
5             2             Joe Smith    contains
6             John       1     occurs
7             Joe Smith 1   occurs
8             Ellis    1    occurs
9             John    2    occurs
10           Joe Smith    2    occurs

In order to create the edges list, I think something like a lookup needs to be done, like so

Select name from nodes where type is document, use this name to lookup all entities where name equals doc_id this would give me a subset of
all entities that occur inside that particular document. use this information to created the edges records...something like that... 

My question is this even possible with Cascading? And if so could someone please show me the right direction...

Kind regards,
Martijn 

Ken Krugler

unread,
May 23, 2013, 10:17:43 AM5/23/13
to cascadi...@googlegroups.com
My question is this even possible with Cascading? And if so could someone please show me the right direction…

If I understand the goal correctly, it's pretty easy.

You'd first generate two Tuples for each incoming data (with fields "source", "target" and "relationship"):

doc_id name "contains"
name doc_id "occurs"

Then do a Unique, using all of the fields to remove dups. Something like:

        Pipe p = new Pipe("graph edges");

        

        // Strip out everything but the two fields we need
        p = new Each(p, new Fields("doc_id", "name"), new Identity());

        

        // Split the pipe, and output the original data, but renamed and with the new "contains" relationship.
        Pipe contains = new Pipe("contains", p);
        contains = new Rename(contains, new Fields("doc_id", "name"), new Fields("source", "target"));
        contains = new Each(contains, new Insert(new Fields("relationship"), "contains"), Fields.ALL);

        

        // Split the pipe, and output swapped fields, also renamed and with the new "occurs" relationship.
        Pipe occurs = new Pipe("occurs", p);
        occurs = new Rename(occurs, new Fields("name", "doc_id"), new Fields("source", "target"));
        occurs = new Each(occurs, new Insert(new Fields("relationship"), "occurs"), Fields.ALL);

        

        // Merge and deduplicate
        Pipe merged = new Merge("merged", contains, occurs);
        merged = new Unique(merged, new Fields("source", "target", "relationship"));

To generate actual edge_id values, the easy but less efficient approach is to run them through a single final reducer. If these don't have to be sequential, just unique, then you could use each reducer's task number x some big offset value.

-- Ken

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Martijn van Leeuwen

unread,
May 23, 2013, 4:44:07 PM5/23/13
to cascadi...@googlegroups.com
Hi Ken,

You're solutions rocks! 
It may seem easy to you but for a newbie this is pretty complex. Need to read more docs and try more samples :-)
Thank you very much!

Kind regards,
Martijn 

Op donderdag 23 mei 2013 16:17:43 UTC+2 schreef kkrugler het volgende:

Martijn van Leeuwen

unread,
May 24, 2013, 3:47:40 PM5/24/13
to cascadi...@googlegroups.com
Hi,

Today I showed this to a collega of my, he was very exiting to see this working. 
But he had a few good questions which I couldn't answer...well not one of them not. 
The first question he had was can we put a weight to the relationship, by simple counting the occurrences, solved it like this

contains = new GroupBy(contains, new Fields("source", "target", "relationship"));
contains = new Every(contains, new Fields("source", "target", "relationship"), new Count(), new Fields("source", "target", "relationship", "count")); 

don't know if this is the right solution but it solved the problem for me.. 

The second question he had was a bit complicated, he was wondering if you could also create relationships between two entities? Let's say we would want know which entities 
knows one another.

For the test case we defined that entities who are 50 characters apart from one another have a relationship. 
Looking at the entitielist this would mean that within document with the id of 1 the following two entities have a relationship

doc_id    name    type    offset    length
1            John     Person     100    4
1            Joe Smith    Person     110    8

Does cascading has a way of comparing tuples with each other or is this something cascading cannot do? 
I have been trying for a couple of hours now and I don't have a clue...

Kind regards,
Martijn





Op donderdag 23 mei 2013 22:44:07 UTC+2 schreef Martijn van Leeuwen het volgende:

Paco Nathan

unread,
May 24, 2013, 8:50:39 PM5/24/13
to cascadi...@googlegroups.com
Hi Martijn,

If I understand correctly, you could do a GroupBy on the doc_id, then within every group you could perform those comparisons. Would have to write code for the comparisons -- ostensibly a Levenshtein distance from what you described -- and run it in a nested loop.

Paco



--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Martijn van Leeuwen

unread,
May 25, 2013, 3:47:43 AM5/25/13
to cascadi...@googlegroups.com
Hi Paco,

From what I understand a GroupBy does nothing more the ordering the tuple stream by any given field or fields. It doesn't sends a group of tuples to lets say a Each in order to
do some custom operations on that particular group of tuples...or do I have it totally wrong.

Reading your comment I have to do something like this I think:

Pipe p = new Pipe("edges");
p = new GroupBy("document order", p, new Fields("doc_id"));
p = new Each(p, Fields.ALL, new GetRelatedEntities(), new Fields("source", "target", "relationship"));

Is this correct?  
 
Op zaterdag 25 mei 2013 02:50:39 UTC+2 schreef Paco Nathan het volgende:

Chris K Wensel

unread,
May 25, 2013, 10:33:02 AM5/25/13
to cascadi...@googlegroups.com
You want to use the Every pipe to handle every grouping.

http://docs.cascading.org/cascading/2.1/userguide/html/ch03s03.html#N20422

The Each pipe applies operations that are subclasses of Functions and Filters (described in the Javadoc). For example, using Each you can parse lines from a logfile into their constituent fields, filter out all lines except the HTTP GET requests, and replace the timestring fields with date fields.

Similarly, since the Every pipe works on tuple groups (the output of a GroupBy or CoGroup pipe), it applies operations that are subclasses of Aggregators and Buffers. For example, you could use GroupBy to group the output of the above Each pipe by date, then use an Every pipe to count the GET requests per date. The pipe would then emit the operation results as the date and count for each group.

Martijn van Leeuwen

unread,
May 25, 2013, 11:33:23 AM5/25/13
to cascadi...@googlegroups.com
I created the following flow

    Pipe p = new Pipe("graph edges");
    
    p = new GroupBy("document order", p, new Fields(doc_id));
    p = new Every(p, new GetRelatedEntities(),Fields.ALL);
    p = new Each(p, new Insert(new Fields("relationship"), "directed"), Fields.ALL);

     FlowDef nerFlowDef = FlowDef.flowDef()
    .addSource(p, nerSourceTap)
    .addTailSink(p, nerDestinationTap);
    
    Flow nerFlowConnector = flowConnector.connect(nerFlowDef);
    nerFlowConnector.complete();

This is how my custom Aggregator looks like...

public class GetRelatedEntities extends
BaseOperation<GetRelatedEntities.Context> implements
Aggregator<GetRelatedEntities.Context> {
public static class Context {
HashMap<String, Integer> map = new HashMap<String, Integer>();
}
 
@Override
public void start(FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall) {
aggregatorCall.setContext(new Context());
}
 
@Override
public void aggregate(FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall) {
TupleEntry arguments = aggregatorCall.getArguments();
Context context = aggregatorCall.getContext();
context.map.put(arguments.getString(1), Integer.parseInt(arguments.getString(3)));
}
 
@Override
public void complete(FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall) {
Context context = aggregatorCall.getContext();
        //Now for the relations
        //loop through the hashmap, capture the first entry, capture the second entry and its value
        //compare the distance, if distande between 50 characters then match a relationship
        Integer nodeId= 0;
        Map.Entry<String, Integer> _node = null;
        Map.Entry<String, Integer> _currentnode = null;
        
//loop through the map and find related entities
HashMap<String, Integer> x = context.map;
Iterator i = context.map.entrySet().iterator();
while(i.hasNext()){
_node = (Map.Entry)i.next();
for(int n =0; n < x.size(); n++){
_currentnode = (Map.Entry)x.entrySet().toArray()[n];
int distance = Math.abs(_currentnode.getValue() - _node.getValue());
                 if (distance > 0 && distance < 50) {
             Tuple result = new Tuple();
             result.addString(_node.getKey());
             result.addString(_currentnode.getKey());
            aggregatorCall.getOutputCollector().add(result);
                 }
}
}
}
}

I run a test on a 14MB file containing over 170K records, it took almost an hour!! Any suggestions to speed up my job?

Kind regards,
Martijn

Op donderdag 23 mei 2013 12:14:15 UTC+2 schreef Martijn van Leeuwen het volgende:

Ken Krugler

unread,
May 25, 2013, 12:29:24 PM5/25/13
to cascadi...@googlegroups.com
Hi Martin,

On May 24, 2013, at 12:47pm, Martijn van Leeuwen wrote:

Hi,

Today I showed this to a collega of my, he was very exiting to see this working. 
But he had a few good questions which I couldn't answer...well not one of them not. 
The first question he had was can we put a weight to the relationship, by simple counting the occurrences, solved it like this

contains = new GroupBy(contains, new Fields("source", "target", "relationship"));
contains = new Every(contains, new Fields("source", "target", "relationship"), new Count(), new Fields("source", "target", "relationship", "count")); 

don't know if this is the right solution but it solved the problem for me.. 

It does work, though a CountBy() would be more efficient, versus GroupBy + Every, since it will do map-side counting and thus reduce data being "shuffled" to the reduce phase.

The second question he had was a bit complicated, he was wondering if you could also create relationships between two entities? Let's say we would want know which entities 
knows one another.

For the test case we defined that entities who are 50 characters apart from one another have a relationship. 
Looking at the entitielist this would mean that within document with the id of 1 the following two entities have a relationship

doc_id    name    type    offset    length
1            John     Person     100    4
1            Joe Smith    Person     110    8

Does cascading has a way of comparing tuples with each other or is this something cascading cannot do? 
I have been trying for a couple of hours now and I don't have a clue…

I think the solution you posted would work, but has a lot of memory allocations going on (hash map, new tuple, etc)

Here's a different approach, that leverages Cascading's support for sorting values as part of a GroupBy:

    private static class GetRelatedEntities extends BaseOperation implements Buffer {

        private int _maxDistance;

        

        private transient Queue<TupleEntry> _entityWindow;
        private transient Tuple _knowsRelationship1;
        private transient Tuple _knowsRelationship2;

        

        public GetRelatedEntities(int maxDistance) {
            super(new Fields("source", "target", "relationship"));
            _maxDistance = maxDistance;
        }

        

        @Override
        public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
            super.prepare(flowProcess, operationCall);

            

            _entityWindow = new LinkedList<TupleEntry>();
            _knowsRelationship1 = new Tuple("", "", "knows");
            _knowsRelationship2 = new Tuple("", "", "knows");
        }

        

        @Override
        public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
            // On entry we get a group by doc_id, with entities sorted by offset.
            // If two entities are close enough together, we want to emit two new
            // relationships, <a>, <b>, "knows" and <b>, <a>, "knows". To keep
            // track, we'll use a list of objects, and whenever the distance from
            // the head to the new object is > _maxDistance, we can flush the
            // head.

            

            _entityWindow.clear();

            

            Iterator<TupleEntry> iter = bufferCall.getArgumentsIterator();
            while (iter.hasNext()) {
                // Get the next entity.
                TupleEntry curEntity = iter.next();
                int entityLocation = curEntity.getInteger("offset");

                

                // Flush everything from the front of the queue that's outside the offset window for this entity.
                int windowStarOffset = entityLocation - _maxDistance;
                while (!_entityWindow.isEmpty() && (_entityWindow.peek().getInteger("offset") < windowStarOffset)) {
                    _entityWindow.remove();
                }

                

                // Now generate "knows" relationships for everything left in the queue, when combined with the
                // current entity.
                String curEntityName = curEntity.getString("name");
                _knowsRelationship1.set(0, curEntityName);
                _knowsRelationship2.set(1, curEntityName);

                

                for (TupleEntry prevEntity : _entityWindow) {
                    // Fill in the previous entity's name
                    String prevEntityName = prevEntity.getString("name");
                    _knowsRelationship1.set(1, prevEntityName);
                    bufferCall.getOutputCollector().add(_knowsRelationship1);

                    

                    _knowsRelationship2.set(0, prevEntityName);
                    bufferCall.getOutputCollector().add(_knowsRelationship2);
                }

                

                // Now push the current event onto the queue. We have to copy the TupleEntry since it
                // will get re-used for each call to us.
                _entityWindow.add(new TupleEntry(curEntity));
            }
        }

        

    }

    

    

    @Test
    public void testGraphGeneration() throws Exception {
        BasePlatform platform = new LocalPlatform(GraphGenerationTest.class);

        

        Fields sourceFields = new Fields("doc_id", "name", "type", "offset", "length");
        BasePath in = platform.makePath("build/test/GraphGenerationTest/testGraphGeneration/in");
        Tap sourceTap = platform.makeTap(platform.makeBinaryScheme(sourceFields), in);
        TupleEntryCollector writer = sourceTap.openForWrite(platform.makeFlowProcess());

        

        writer.add(new Tuple("1", "John", "Person", 100, 4));
        writer.add(new Tuple("1", "Joe Smith", "Person", 110, 8));
        writer.add(new Tuple("1", "Ellis", "Person", 230, 5));

        

        writer.add(new Tuple("2", "John", "Person", 100, 4));
        writer.add(new Tuple("2", "Joe Smith", "Person", 435, 8));
        writer.close();

        

        Pipe p = new Pipe("graph edges");

        

        // Split the pipe, and create "knows" relationships, based on the
        // distance between two entities in the same document. We sort by offset in the GroupBy,
        // so that the GetRelatedEntities Buffer can leverage that to avoid keeping everything
        // around in memory.
        Pipe knowsPipe = new Pipe("knows", p);
        knowsPipe = new Each(knowsPipe, new Fields("doc_id", "name", "offset"), new Identity());
        knowsPipe = new GroupBy(knowsPipe, new Fields("doc_id"), new Fields("offset"));
        knowsPipe = new Every(knowsPipe, new GetRelatedEntities(50), Fields.RESULTS);

        // Strip out everything but the two fields we need downstream from here.
        p = new Each(p, new Fields("doc_id", "name"), new Identity());

        

        // Split the pipe, and output the original data, but renamed and with the new "contains" relationship.
        Pipe containsPipe = new Pipe("contains", p);
        containsPipe = new Rename(containsPipe, new Fields("doc_id", "name"), new Fields("source", "target"));
        containsPipe = new Each(containsPipe, new Insert(new Fields("relationship"), "contains"), Fields.ALL);

        

        // Split the pipe, and output swapped fields, also renamed and with the new "occurs" relationship.
        Pipe occursPipe = new Pipe("occurs", p);
        occursPipe = new Rename(occursPipe, new Fields("name", "doc_id"), new Fields("source", "target"));
        occursPipe = new Each(occursPipe, new Insert(new Fields("relationship"), "occurs"), Fields.ALL);
        occursPipe = new Each(occursPipe, new Debug(true));

        // Merge and generate a count of each relationship.
        Pipe merged = new Merge("merged", containsPipe, occursPipe, knowsPipe);
        merged = new CountBy(merged, new Fields("source", "target", "relationship"), new Fields("count"));
        
        BasePath out = platform.makePath("build/test/GraphGenerationTest/testGraphGeneration/out");
        Tap sinkTap = platform.makeTap(platform.makeTextScheme(), out, SinkMode.REPLACE);

        

        Flow f = platform.makeFlowConnector().connect(sourceTap, sinkTap, merged);
        f.complete();
    }


-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Martijn van Leeuwen

unread,
May 26, 2013, 5:03:08 AM5/26/13
to cascadi...@googlegroups.com
Wow Ken! This works like a charm! This give me a great inside into the possibilities of Cascading!  
Much tnx!

Martijn

Op donderdag 23 mei 2013 12:14:15 UTC+2 schreef Martijn van Leeuwen het volgende:
Reply all
Reply to author
Forward
0 new messages