applying DistCacheTap selectively to HashJoin rhs

73 views
Skip to first unread message

Ruban Monu

unread,
Aug 2, 2016, 12:55:48 PM8/2/16
to cascadi...@googlegroups.com, Piyush Narang
Is there a way (in Cascading 3) to walk the flow plan and apply DistCacheTap to any source tap that is the rhs of a HashJoin?

This is re https://github.com/twitter/scalding/issues/1103  In Scalding, I don't think we can know if a source is going to be used for HashJoin at the time we call createTap on it.

The solution in our current fork finds "accumulated" sources in HadoopFlowStep and enables distributed cache for those sources:
https://github.com/twitter/cascading/commit/8271526443c9ef832415df5d9673fde3e4391620

From what I can tell, there is no way to look at the pipes and wrap their source taps in DistCacheTap after the pipes have been created?

I see support for decorating all temporary taps or checkpoint taps in FlowConnectorProps, but nothing that can be applied to sources that are accumulated. (It's probably not possible to do that without first checkpointing the source and then wrapping the resulting tap?)

I'm hoping there's a simple solution for this that I've completely missed.

Thanks!

-Ruban

Chris K Wensel

unread,
Aug 2, 2016, 3:59:25 PM8/2/16
to cascadi...@googlegroups.com, Piyush Narang
fwiw, on Tez, this is not an issue as a broadcast edge is always used.

for MapReduce, my recommendation would be to update the rules in MapReduceHadoopRuleRegistry to use an alternate IntermediateTapElementFactory implementation that returns a DistCacheTap.

see line 41


when that rule fires, and others like it, they create a standard temp tap from the registered factory. just create a new factory, and update the rules to use it.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/CAAuNgTYTXp5U4MAccZ2qR9JHpWuGA5ySMkqzDCOO_wk6tTtYkQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Ruban Monu

unread,
Aug 4, 2016, 5:10:30 PM8/4/16
to cascadi...@googlegroups.com, Piyush Narang
What's the right way to match the rhs of a HashJoin in an ExpressionGraph?

I have a basic rule (based on TapBalanceCheckpointTransformer / BalanceCheckpointWithTapExpression), which iiuc matches a HashJoin and attempts to add a DistCacheTap for reading the HashJoin output:

class HashJoinExpression extends RuleExpression(
  new ExpressionGraph().arcs(
    new FlowElementExpression(ElementCapture.Primary, true, classOf[HashJoin]),
    not(new FlowElementExpression(classOf[Tap[_, _, _]]))))

class HashJoinTransformer extends RuleInsertionTransformer(
  BalanceAssembly,
  new HashJoinExpression,
  DistCacheTapElementFactory.DIST_CACHE_TAP) // custom factory to return a DistCacheTap

Thanks.


On Tue, Aug 2, 2016 at 12:59 PM, Chris K Wensel <ch...@wensel.net> wrote:
fwiw, on Tez, this is not an issue as a broadcast edge is always used.

for MapReduce, my recommendation would be to update the rules in MapReduceHadoopRuleRegistry to use an alternate IntermediateTapElementFactory implementation that returns a DistCacheTap.

see line 41


when that rule fires, and others like it, they create a standard temp tap from the registered factory. just create a new factory, and update the rules to use it.

On Aug 2, 2016, at 9:55 AM, 'Ruban Monu' via cascading-user <cascading-user@googlegroups.com> wrote:

Is there a way (in Cascading 3) to walk the flow plan and apply DistCacheTap to any source tap that is the rhs of a HashJoin?

This is re https://github.com/twitter/scalding/issues/1103  In Scalding, I don't think we can know if a source is going to be used for HashJoin at the time we call createTap on it.

The solution in our current fork finds "accumulated" sources in HadoopFlowStep and enables distributed cache for those sources:
https://github.com/twitter/cascading/commit/8271526443c9ef832415df5d9673fde3e4391620

From what I can tell, there is no way to look at the pipes and wrap their source taps in DistCacheTap after the pipes have been created?

I see support for decorating all temporary taps or checkpoint taps in FlowConnectorProps, but nothing that can be applied to sources that are accumulated. (It's probably not possible to do that without first checkpointing the source and then wrapping the resulting tap?)

I'm hoping there's a simple solution for this that I've completely missed.

Thanks!

-Ruban


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.

Chris K Wensel




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.

Chris K Wensel

unread,
Aug 4, 2016, 7:16:34 PM8/4/16
to cascadi...@googlegroups.com, Piyush Narang
I would not add NEW rules — that won’t work— but update the EXISTING rules to use a new factory. 

the rules that add a temp Tap before the HashJoin should be obvious in the naming.

that is, just change the rule to grab a different factory.

(and I would NOT write it in Scala, since you can’t send it as a pull request and contribute the work back assuming globally using DistCacheTap works at scale which I hope you prove does work)

ckw

To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Ruban Monu

unread,
Aug 5, 2016, 12:59:18 PM8/5/16
to cascadi...@googlegroups.com, Piyush Narang
Makes sense about java and pushing this upstream if this ends up working well.

Looking at MapReduceHadoopRuleRegistry, I can't find a rule that universally matches the rhs of a HashJoin. The ones referring to HashJoin are dealing with a few special cases iiuc? I might be completely misreading the code though.

If I have a simple job like below (borrowed code from tests), is there an existing rule that would match this?

Thanks.
==

Map sources = new HashMap();
sources.put( "lhs", getPlatform().getDelimitedFile( ... ) );
sources.put( "rhs", getPlatform().getDelimitedFile( ... ) );

Tap sink = getPlatform().getDelimitedFile( ... );

Pipe pipe1 = new Pipe( "lhs" );
Pipe pipe2 = new Pipe( "rhs" );
Pipe join = new HashJoin( pipe1, fields1, pipe2, fields2)

getPlatform().getFlowConnector().connect( sources, sink, join )



On Thu, Aug 4, 2016 at 4:16 PM, Chris K Wensel <ch...@wensel.net> wrote:
I would not add NEW rules — that won’t work— but update the EXISTING rules to use a new factory. 

the rules that add a temp Tap before the HashJoin should be obvious in the naming.

that is, just change the rule to grab a different factory.

(and I would NOT write it in Scala, since you can’t send it as a pull request and contribute the work back assuming globally using DistCacheTap works at scale which I hope you prove does work)

ckw

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel





--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel

unread,
Aug 5, 2016, 2:10:02 PM8/5/16
to cascadi...@googlegroups.com, Piyush Narang
Any rule matching for a PathScopeExpression.BLOCKING edge is the accumulated path.

for example see BalanceHashJoinBlockingHashJoinExpression.

That said in hindsight, it might be interesting to write a rule that comes last in the BalanceAssembly phase that looks for

TempTap — blocking —> HashJoin (and isn’t a self-join)

and wrap the TempTap with a DistCacheTap. This may require a new Insertion type for Replace to keep it generic.

this lets all the other rules fire, the we just use meta-data on the edges to optimize things a bit.

To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Ruban Monu

unread,
Aug 9, 2016, 12:23:35 PM8/9/16
to cascadi...@googlegroups.com, Piyush Narang
I have my code changes so far posted on https://github.com/twitter/scalding/pull/1585

It tries to use a general rule to match any Hfs -> blocking -> HashJoin change. The RuleExpression likely needs some more tweaking. A review would be great. Thanks!

On Fri, Aug 5, 2016 at 11:09 AM, Chris K Wensel <ch...@wensel.net> wrote:
Any rule matching for a PathScopeExpression.BLOCKING edge is the accumulated path.

for example see BalanceHashJoinBlockingHashJoinExpression.

That said in hindsight, it might be interesting to write a rule that comes last in the BalanceAssembly phase that looks for

TempTap — blocking —> HashJoin (and isn’t a self-join)

and wrap the TempTap with a DistCacheTap. This may require a new Insertion type for Replace to keep it generic.

this lets all the other rules fire, the we just use meta-data on the edges to optimize things a bit.


Chris K Wensel





--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Piyush Narang

unread,
Aug 16, 2016, 5:15:46 PM8/16/16
to cascading-user, pna...@twitter.com
Looking at Ruban's PR - https://github.com/twitter/scalding/pull/1585, I'm wondering if a better home for a lot of those changes is in Cascading instead of Scalding?
I could move the classes: ReplaceGraphFactoryBasedTransformer, RuleReplaceFactoryBasedTransformer, DistCacheTapElementFactory, HashJoinExpression, HashJoinDistCacheTransformer into a Cascading PR. We can turn off enabling the rule by default and retain the DistCacheEnabledHadoopPlanner in Scalding. 

Thanks,


On Tuesday, August 9, 2016 at 9:23:35 AM UTC-7, Ruban Monu wrote:
I have my code changes so far posted on https://github.com/twitter/scalding/pull/1585

It tries to use a general rule to match any Hfs -> blocking -> HashJoin change. The RuleExpression likely needs some more tweaking. A review would be great. Thanks!
On Fri, Aug 5, 2016 at 11:09 AM, Chris K Wensel <ch...@wensel.net> wrote:
Any rule matching for a PathScopeExpression.BLOCKING edge is the accumulated path.

for example see BalanceHashJoinBlockingHashJoinExpression.

That said in hindsight, it might be interesting to write a rule that comes last in the BalanceAssembly phase that looks for

TempTap — blocking —> HashJoin (and isn’t a self-join)

and wrap the TempTap with a DistCacheTap. This may require a new Insertion type for Replace to keep it generic.

this lets all the other rules fire, the we just use meta-data on the edges to optimize things a bit.

To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

Chris K Wensel





--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel





--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.

Chris K Wensel

unread,
Aug 16, 2016, 6:07:42 PM8/16/16
to cascadi...@googlegroups.com, pna...@twitter.com
I would be fine with using the DistCachTap always (by updating the current rule registry for MR)  if it always works and is a performance improvement. this of course means the platform tests pass and there are some assurances the code is well tested and stable in a production environment.

otherwise having the alternate rule registry available as an option while its experimental or fussy is fine too.

so yes, a PR would be great against wip-3.2. but I ask it be proven to be either stable or fussy before making the request so we can integrate/document it appropriately.


ckw


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Piyush Narang

unread,
Aug 16, 2016, 6:24:03 PM8/16/16
to cascading-user, pna...@twitter.com
Ok, I'll start adding the code to the wip-3.2 branch and some tests to IsomorphismTest to go with it. Once I've got the tests etc passing, we can cut an internal release and try it out on a few of our internal jobs (and sanity check with our E2E job suite). We can take a call based on that (and if you have other users to try this out with). 

Piyush Narang

unread,
Aug 19, 2016, 7:23:48 PM8/19/16
to cascading-user, pna...@twitter.com
Thought I'd circle back with some updates. I've put up a PR with my changes: https://github.com/cwensel/cascading/pull/55. Would be great if someone could take a look. 

I was able to test this out on a few hashJoin jobs internally and was able to verify that the dist cache was being applied on the rhs. I do end up seeing a 10-20% reduction in HDFS bytes read / read ops on one of my jobs that had around 1300 map tasks which are performing the hashJoin (size of rhs in HDFS was around 100MB). Runtime of the two jobs was comparable. 

When we ran into this issue a few months back(on Cascading 2), we had a job that caused a lot of load on our Hadoop cluster as it had a lot of map tasks and was trying to read the rhs data in each mapper. I wasn't able to find the specific jobs that caused this to test the change out with but I was trying to simulate that behavior with my 1300 mapper job. 

I've reached out to some teams (with hashJoin jobs) internally to get them to try these changes out. I think that step will probably take a few weeks based on when they get the time. In the PR as of now, I've turned off these changes. We could create overrides in Scalding which allow us to try out these changes as part of our internal Cascading 3 roll out (basically keep the changes in ScaldingHadoopFlowProcess - https://github.com/twitter/scalding/pull/1585/files). A few months down the line, as our internal users pick this up, we would have a decent number of hashJoin jobs using this change internally. 

Thanks, 

Ken Krugler

unread,
Aug 20, 2016, 12:17:45 AM8/20/16
to cascadi...@googlegroups.com
Hi Piyush,

This seems interesting - do you run into any job failures with Cascading 2.x/HashJoins previously?

Asking because a while back we’d experienced failures on jobs using HashJoins with many map tasks (this is with Cascading 2.x, don’t have the exact version), and wound up having to revert to regular CoGroups to get the reliability we needed.

— Ken

On Aug 19, 2016, at 4:23pm, 'Piyush Narang' via cascading-user <cascadi...@googlegroups.com> wrote:

Thought I'd circle back with some updates. I've put up a PR with my changes: https://github.com/cwensel/cascading/pull/55. Would be great if someone could take a look. 

I was able to test this out on a few hashJoin jobs internally and was Hi able to verify that the dist cache was being applied on the rhs. I do end up seeing a 10-20% reduction in HDFS bytes read / read ops on one of my jobs that had around 1300 map tasks which are performing the hashJoin (size of rhs in HDFS was around 100MB). Runtime of the two jobs was comparable. 

For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Piyush Narang

unread,
Aug 22, 2016, 2:40:26 PM8/22/16
to cascading-user
Hi Ken,

Yeah I believe we did. I was chatting with one of the folks in my team who's been around for longer and he mentioned that we'd seen issues when the number of mappers was much higher than what I'd tested above. (I tested 1.3K, he mentioned issues at the 20-30K range). I tweaked my test job to make the Hadoop split sizes smaller to try a run with a lot more mappers. Ended up with a job which had 21K mappers (and 250 reducers). Running the job with my changes (https://github.com/cwensel/cascading/pull/55), succeeded. When I tried a branch without my changes, the job fails with the error:
Error: cascading.tuple.TupleException: unable to read from input identifier: viewfs://hadoop-dw2-nn.smf1.twitter.com/tables/statuses/2016/07/24/21/statuses-20160724210000-20160724220000.lzo at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:152) at 
cascading.flow.stream.element.SourceStage.map(SourceStage.java:84) at 
cascading.flow.stream.element.SourceStage.run(SourceStage.java:66) at 
cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:139) at 
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450) at 
org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:180)
...

Looking at some of our old internal jiras, this was one of the manifested failure scenarios. We were running a fork of Cascading 2.6.x from what I understand. 

Piyush Narang

unread,
Aug 22, 2016, 3:27:35 PM8/22/16
to cascading-user
Missing a part of the stack in my previous reply. Believe the BlockMissingError is because we're hitting the data nodes too aggresively due to not caching the rhs (the topN path is our rhs dataset). 
cascading.tuple.TupleException: unable to read from input identifier: viewfs://hadoop-dw2-nn.smf1.twitter.com/tables/statuses/2016/07/27/19/statuses-20160727190000-20160727200000.lzo
	at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:152)
	at cascading.flow.stream.element.SourceStage.map(SourceStage.java:84)
	at cascading.flow.stream.element.SourceStage.run(SourceStage.java:66)
	at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:139)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:180)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1645)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:175)
Caused by: org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block: BP-1748500278-10.52.50.140-1377803467793:blk_2305124715_1101319970837 file=/foo/bar/topN/part-00000
	at org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:897)
	at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:568)
	at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:803)
	at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:849)
	at java.io.DataInputStream.read(DataInputStream.java:100)
	at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
	at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:206)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:244)
	at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
	at cascading.tap.hadoop.util.MeasuredRecordReader.next(MeasuredRecordReader.java:61)
	at cascading.scheme.hadoop.TextDelimited.source(TextDelimited.java:1005)
	at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
	at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
	... 10 more

Ken Krugler

unread,
Aug 22, 2016, 5:34:08 PM8/22/16
to cascadi...@googlegroups.com
Hi Piyush,

Thanks for including the exception. And yes, I’d run into exactly the same issue (with AWS Elastic Mapreduce) about a year ago, when using a HashJoin.

— Ken



For more options, visit https://groups.google.com/d/optout.

Piyush Narang

unread,
Aug 22, 2016, 6:55:40 PM8/22/16
to cascading-user
Ah that's interesting. If you still have access to that job / data you could try running it with my branch to test if it works for you.

Piyush Narang

unread,
Aug 25, 2016, 8:10:37 PM8/25/16
to cascading-user, pna...@twitter.com
Thought I'd ping to see if someone would have some time to take a look at the PR I'd put out for this? PR is: https://github.com/cwensel/cascading/pull/55

Thanks,
Reply all
Reply to author
Forward
0 new messages