run cascading job with multiple reducers

216 views
Skip to first unread message

Dilip K

unread,
Jul 30, 2015, 6:27:09 PM7/30/15
to cascading-user
I have a simple cascading workflow, a custom buffer join implemented to perform a Cartesian join on stream of tuples. I ran the job using MR, MR2 and TEZ flow connectors, but the job always running with 2 mappers and single reducer. Added the following properties to the flowConnector still the job running with single reducer and taking hours to complete my job. Please advise if I am missing something or any additional properties to set.

MR2Flow connector: "mapreduce.job.reduces", 10
TEZ Flow connector: "cascading.flow.runtime.gather.partitions.num", Integer.toString( 10 )

Andre Kelpe

unread,
Jul 31, 2015, 4:10:42 AM7/31/15
to cascading-user

Cascading uses the mapred API, so you should set mapred.reduce.tasks to the value you want in your properties.

André

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/dce26f64-8ec1-4acc-bf69-0b659675127b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dilip K

unread,
Jul 31, 2015, 10:16:30 AM7/31/15
to cascading-user, ake...@concurrentinc.com
Thanks Andre. I have added the mapred.reduce.tasks. Still the job running with one reducer. One thing I noticed is at the beginning of job YARN showing 20 containers allocated, and the next 10 seconds the containers are getting dropped to 1 and running with 1 till the job completes(with one reducer the job taking around 1 hour 20 minutes to process 5 million tuples).

Thanks
Dilip Kari

Ken Krugler

unread,
Jul 31, 2015, 10:33:34 AM7/31/15
to cascadi...@googlegroups.com
If you only have one key value being used for the Cartesian join, then you'll only have one reduce task.

-- Ken


From: Dilip K

Sent: July 31, 2015 7:16:30am PDT

To: cascading-user

Cc: ake...@concurrentinc.com

Subject: Re: run cascading job with multiple reducers



For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr







--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip K

unread,
Jul 31, 2015, 12:10:50 PM7/31/15
to cascading-user, kkrugle...@transpac.com
Hi Ken,

I am doing a Cartesian Join on 2000 tuples(with 150 fields) to find the similarity between each user to all other users, which I will end up with 2000 * 2000 tuples.
Please let me know what you are referring as one key value here.

Thanks
Dilip Kari

Ken Krugler

unread,
Jul 31, 2015, 2:19:10 PM7/31/15
to cascadi...@googlegroups.com


From: Dilip K

Sent: July 31, 2015 9:10:50am PDT

To: cascading-user

Cc: kkrugle...@transpac.com

Subject: Re: run cascading job with multiple reducers


Hi Ken,

I am doing a Cartesian Join on 2000 tuples(with 150 fields) to find the similarity between each user to all other users, which I will end up with 2000 * 2000 tuples.
Please let me know what you are referring as one key value here.

I assume you're doing a self-join (via GroupBy). Depending on the field(s) being used for the join, and the number of unique values for that/those field(s), you can wind up with heavily skewed partitioning.

See the thread titled "Reducers stalled while doing a Cartesian product" from about 6 years ago.

In that conversation, Mike provided enough details to figure out what was going wrong (example data, actual code, etc.)

Regards,

-- Ken



For more options, visit https://groups.google.com/d/optout.

Chris K Wensel

unread,
Jul 31, 2015, 3:32:07 PM7/31/15
to cascadi...@googlegroups.com
to visually verify the behavior of your application, drop in the driven plugin


the skew will be obvious if that’s what’s happening. (eap only works with Cascading 2.x currently — soon to change)


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Dilip K

unread,
Jul 31, 2015, 3:42:43 PM7/31/15
to cascading-user, kkrugle...@transpac.com
Thanks for your input Ken.
Here is how I implemented the Cartesian product through a custom bufferJoin called in CoGroup.

// split the pipe into two branches
Pipe lhsUserFeatures = new Pipe("lhs-user-features", readCsvAssembly);
Pipe rhsUserFeatures = new Pipe("rhs-user-features", readCsvAssembly);

// perform a cross join by specifying no join fields
// handle join logic in Buffer implementation
Pipe userDistance = new CoGroup(lhsUserFeatures, Fields.NONE, rhsUserFeatures, Fields.NONE, new BufferJoin());
userDistance = new Every(userDistance, new EdgeDistanceBufferJoin(), new Fields("some output fields");

Thanks
Dilip

Ken Krugler

unread,
Jul 31, 2015, 6:47:14 PM7/31/15
to cascadi...@googlegroups.com
Using Fields.NONE for the grouping key will (I believe) give you a single group, with the cartesian product of all records.

So that will wind up using a single reducer.

An alternative approach would be to save the upstream result (what you get from the readCsvAssembly pipe), and the do a GroupBy on Fields.ALL with just that pipe.

That will give you one group per unique record (spread out over all of the reducers).

Then in your custom Buffer function (modified version of EdgeDistanceBufferJoin), read in the upstream result (in memory if possible, or reopen for each group) and calc the distance.

-- Ken


From: Dilip K

Sent: July 31, 2015 12:42:43pm PDT


For more options, visit https://groups.google.com/d/optout.

Dilip K

unread,
Aug 1, 2015, 2:14:29 AM8/1/15
to cascading-user
Ken,

Thank you so much for the detailed explanation.

I have added the GroupBy with Fields.ALL, it was working fine with LocalFlowConnector.
When I run the same with Hadoop2MR1FlowConnector/Hadoop2TezFlowConnector getting the following error. Upstream result is parsing a CSV file, where I have dynamic columns for each run, so can't define the field names on readCsvAssembly. new GroupBy(readCsvAssembly, Fields.ALL) is failing since the pipe readCsvAssembly is a tuple steam with UNKNOWN fields. Please suggest if any workaround.

2015-08-01 00:57:45,280 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : cascading.tuple.TupleException: unable to set into: [UNKNOWN], using selector: [UNKNOWN]
	at cascading.tuple.Tuple.set(Tuple.java:797)
	at cascading.flow.hadoop.HadoopGroupByClosure$1$1.makeResult(HadoopGroupByClosure.java:93)
	at cascading.flow.hadoop.HadoopGroupByClosure$1.next(HadoopGroupByClosure.java:120)
	at cascading.flow.hadoop.HadoopGroupByClosure$1.next(HadoopGroupByClosure.java:76)
	at cascading.pipe.joiner.InnerJoin$JoinIterator.initLastValues(InnerJoin.java:152)
	at cascading.pipe.joiner.InnerJoin$JoinIterator.next(InnerJoin.java:184)
	at cascading.pipe.joiner.InnerJoin$JoinIterator.next(InnerJoin.java:68)
	at cascading.tuple.TupleEntryChainIterator.next(TupleEntryChainIterator.java:97)
	at cascading.tuple.TupleEntryChainIterator.next(TupleEntryChainIterator.java:32)
	at cascading.flow.stream.duct.OpenDuct.receive(OpenDuct.java:45)
	at cascading.flow.stream.duct.OpenDuct.receive(OpenDuct.java:28)
	at cascading.flow.hadoop.stream.HadoopGroupGate.accept(HadoopGroupGate.java:141)
	at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:146)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: cascading.tuple.TupleException: given tuple not same size as position array: 0, tuple: ['00011324', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '1', '0', '0', '0', '0', '0']
	at cascading.tuple.Tuple.set(Tuple.java:759)
	

Thanks
Dilip

Ken Krugler

unread,
Aug 1, 2015, 9:27:26 AM8/1/15
to cascadi...@googlegroups.com
It surprises me that Fields.ALL fails with the GroupBy - hoping Chris Wensel can jump in here.

Leaving that aside…

1. Sniff the CSV files to extract the header line (list of fields) in advance of constructing the workflow, so you don't have to use Fields.UNKNOWN

2. If you know that you'll have at least N fields coming out of the CSV parsing, then you could try doing a GroupBy with new Filelds(0, 1, 2, 3, … N-1), thus using field positions vs. names.

You'll wind up with multiple records in each group, but that's easy enough to handle.

-- Ken


From: Dilip K

Sent: July 31, 2015 11:14:28pm PDT

To: cascading-user


For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip K

unread,
Aug 1, 2015, 4:43:39 PM8/1/15
to cascading-user
Thanks Ken. Earlier also I had an issue with dealing a tuple with unknown fields, couldn't able to solve. Will try again. 
...

Chris K Wensel

unread,
Aug 1, 2015, 10:59:32 PM8/1/15
to cascadi...@googlegroups.com
I pull request with a test against the version you are using would be helpful to understand and resolve the issue.



For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Dilip K

unread,
Aug 2, 2015, 9:19:32 AM8/2/15
to cascading-user
Version of Cascading that I am using is 3.0.0 and the Maven repo being used is http://conjars.org/repo
GroupBy(pipe, Fields.ALL) on a pipe with UNKNOWN fields is working fine with LocalFlowConnector, but not with any other HadoopFlowConnectors.
...

Dilip K

unread,
Aug 2, 2015, 9:33:05 AM8/2/15
to cascading-user
I am able to run the GroupBy using Fields.ALL, with that multiple reduces spawned on Grouping the tuples. But the very next step BufferJoin on same pipe is running with single reduce task. Tried with GroupBy on LHS and RHS pipes of CoGroup, ended up with single reduce.
Fields.NONE on CoGroup alone giving the Cartesian product of tuple, ALL or Specific Fields giving very few tupels.

// Read tuples with all Fields from CSV
Function parseCsvFields = new ParseFeaturesCsvFields(getCsvFields(csvColumnCount));
readCsvAssembly = new Each(readCsvAssembly, new Fields("user-record"), parseCsvFields, Fields.RESULTS);

readCsvAssembly = new GroupBy(readCsvAssembly, Fields.ALL);

// split the pipe into two branches
Pipe lhsUserFeatures = new Pipe("lhs-user-features", readCsvAssembly);
// lhsUserFeatures = new GroupBy(lhsUserFeatures);

Pipe rhsUserFeatures = new Pipe("rhs-user-features", readCsvAssembly);
// rhsUserFeatures = new GroupBy(rhsUserFeatures);

Pipe userDistance = new Pipe("feature-matrix");
// perform a cross join by specifying no join fields
// handle join logic in Buffer implementation
userDistance = new CoGroup(lhsUserFeatures, Fields.NONE, rhsUserFeatures, Fields.NONE, new BufferJoin());
userDistance = new Every(userDistance, new EdgeDistanceBufferJoin(), Fields.RESULTS);


Thanks
Dilip

Dilip K

unread,
Aug 3, 2015, 4:19:54 AM8/3/15
to cascading-user
Thanks a lot Ken and Chris, for all your suggestions and staying on top of the issue.

Passing GroupFields as NONE to CoGroup gives the Cartesian product of LHSPipe and RHSPipe, but always a single reducer. I tried GroupBy on the pipes right before the CoGroup and every possibility to retain the GroupBy on the pipes. This should be a special case in CoGroup in which additional parameter allowed to tell the groupingField, so as to process with multiple reducers.

Please let me know if any workaround or other ways to perform CrossJoin in cascading.

Thanks
Dilip 

Ken Krugler

unread,
Aug 4, 2015, 6:36:06 PM8/4/15
to cascadi...@googlegroups.com
Hi Dilip,

I think you missed part of what I was suggesting. I had said:

An alternative approach would be to save the upstream result (what you get from the readCsvAssembly pipe), and the do a GroupBy on Fields.ALL with just that pipe.

That will give you one group per unique record (spread out over all of the reducers).

Then in your custom Buffer function (modified version of EdgeDistanceBufferJoin), read in the upstream result (in memory if possible, or reopen for each group) and calc the distance.

You ran into a problem using GroupBy with Fields.ALL, so then I suggested using field positions for some number of the fields, e.g.

        p = new GroupBy(p, new Fields(0, 1));

Would group on the first two fields. I wasn't suggesting trying to combine the GroupBy with a following CoGroup, which won't work.

You still have to do the work of creating a custom Buffer that's reading in a copy of the data (what is effectively the RHS), and calculating the distance metric (similarity) between each record in this RHS against every record in the group.

Setting the number of field positions such that you get # of groups roughly = or > the number of reduce tasks would be most efficient. Or you could inject a new field with a random value from 0…(num reduce tasks -1) and group on that, which is the approach I'd used in the past for this situation.

-- Ken


From: Dilip K

Sent: August 3, 2015 1:19:53am PDT


For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip K

unread,
Aug 5, 2015, 2:43:39 PM8/5/15
to cascading-user
Thanks Ken. Seems I am missing something, tried the following and still issue persist. If you don't mind can you review the following lines of code and suggest which one should be changed.

                // Read tuples with all Fields from CSV
Function parseCsvFields = new ParseFeaturesCsvFields(getCsvFields(csvColumnCount));
readCsvAssembly = new Each(readCsvAssembly, new Fields("user-record"), parseCsvFields, Fields.RESULTS);

// split the pipe into two branches
Pipe lhsUserFeatures = new Pipe("lhs-user-features", readCsvAssembly);

// Apply GroupBy on RHS with specific fields or all 
                // Simply return the group of tuples with a custom buffer
Pipe rhsUserFeatures = new Pipe("rhs-user-features", readCsvAssembly);
rhsUserFeatures = new GroupBy(rhsUserFeatures, new Fields(1)); // rhsUserFeatures = new GroupBy(rhsUserFeatures, Fields.ALL);
rhsUserFeatures = new Every(rhsUserFeatures, new GroupUsersBufferJoin(getCsvFields(csvColumnCount)), Fields.RESULTS);

Pipe userDistance = new Pipe("feature-matrix");
// perform cross join by specifying no join fields
// handle join logic in custom Buffer implementation
userDistance = new CoGroup(lhsUserFeatures, Fields.NONE, rhsUserFeatures, Fields.NONE, new BufferJoin());
userDistance = new Every(userDistance, new EdgeDistanceBufferJoin(), Fields.RESULTS);

Thanks
Dilip
...

Ken Krugler

unread,
Aug 5, 2015, 2:49:25 PM8/5/15
to cascadi...@googlegroups.com
Hi Dilip,

As per my email, I'd said "...I wasn't suggesting trying to combine the GroupBy with a following CoGroup, which won't work."

But it seems like that's what you're still trying to do in your code.

From my earlier email on July 31st:

An alternative approach would be to save the upstream result (what you get from the readCsvAssembly pipe), and the do a GroupBy on Fields.ALL with just that pipe.

That will give you one group per unique record (spread out over all of the reducers).

Then in your custom Buffer function (modified version of EdgeDistanceBufferJoin), read in the upstream result (in memory if possible, or reopen for each group) and calc the distance.

-- Ken


From: Dilip K

Sent: August 5, 2015 11:43:39am PDT


For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip K

unread,
Aug 5, 2015, 3:34:35 PM8/5/15
to cascading-user

Sorry for my ignorance. I was thinking that CoGroup is the only way that we can join two tuple stream and perform different kind of joins.


Let me be clear.
For instance if I have 10 users, I want to find distance between one user to every other user. End result should be 10*10 = 100 distances.

My understanding is that with the approach you suggested earlier
Lets say the 10 users are divided into 3 groups(if group on certain field) or 10 groups(if group on all fields)
Buffer on GroupBy(not with CoGroup) will give me access to only tuples in that group, so I can't find distances to users in other groups.

I will end up with 3*3 + 3*3 + 4*4 = 34 distances.

That is the reason why I am still using the CoGroup buffer function.


Thanks for being patient and responsive.   
...

Ken Krugler

unread,
Aug 5, 2015, 4:15:44 PM8/5/15
to cascadi...@googlegroups.com


From: Dilip K

Sent: August 5, 2015 12:34:34pm PDT

To: cascading-user

Subject: Re: run cascading job with multiple reducers


Sorry for my ignorance. I was thinking that CoGroup is the only way that we can join two tuple stream and perform different kind of joins.


Let me be clear.
For instance if I have 10 users, I want to find distance between one user to every other user. End result should be 10*10 = 100 distances.

My understanding is that with the approach you suggested earlier
Lets say the 10 users are divided into 3 groups(if group on certain field) or 10 groups(if group on all fields)
Buffer on GroupBy(not with CoGroup) will give me access to only tuples in that group, so I can't find distances to users in other groups.

Correct. That's why I also said in my earlier email "... Then in your custom Buffer function (modified version of EdgeDistanceBufferJoin), read in the upstream result (in memory if possible, or reopen for each group) and calc the distance."

So your custom Buffer will need to explicitly open up the saved file in HDFS, read in all of the Tuples, and do the explicit comparison of each of those tuples (10, in your example above) with the tuples in the group (3 or 4).

Since a cartesian product has n^2/2 comparisons, the number of tuples will need to be small (e.g. 100K = 5B comparisons), so you should be able to read them into memory unless the tuple size is really big.

-- Ken



For more options, visit https://groups.google.com/d/optout.

Dilip K

unread,
Aug 5, 2015, 6:02:11 PM8/5/15
to cascading-user
Appreciate your help Ken. I was thinking about a cascading based solution. Will try as suggested. 
...

Ken Krugler

unread,
Aug 7, 2015, 2:51:40 PM8/7/15
to cascadi...@googlegroups.com
See https://github.com/ScaleUnlimited/cascading.snippets/blob/master/src/main/java/com/scaleunlimited/snippets/DistributedCrossProduct.java

This is a SubAssembly that does a cartesian join.


for an example of using it.

Note that you'll need cascading 2.7.1-wip-33 (or later) as Chris just put in a fix for using new Fields(-1) as the grouping key, which is the approach used when doing a self-cross product (your use case)

-- Ken


From: Dilip K

Sent: August 5, 2015 5:02:11pm CDT


For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip K

unread,
Aug 11, 2015, 1:20:46 PM8/11/15
to cascading-user
That was so quick. Thanks for keeping me posted on this.
I tried using the snippet DistributedCrossProduct.java

It is working as expected using LocalFlowConnector, but not with either MR2 or TEZ connectors.

Pipe distancePipe = new DistributedCrossProduct(readCsvAssembly, 15);
        distancePipe = new Each(distancePipe, new Debug("joined",  true));


2015-08-11 04:11:02,287 INFO [main] cascading.flow.hadoop.FlowMapper: sourcing from: Hfs["TextDelimited[['employee-record']]"]["/user/guest/feature-extraction-output.csv"]
2015-08-11 04:11:02,288 INFO [main] cascading.flow.hadoop.FlowMapper: sinking to: CoGroup(DistributedCrossProduct-lhs*DistributedCrossProduct-rhs)[by: DistributedCrossProduct-lhs:[{1}:-1] DistributedCrossProduct-rhs:[{1}:-1]]
2015-08-11 04:11:02,289 INFO [main] cascading.flow.hadoop.FlowMapper: flow node id: BE87D61BA76D4CE589EE04824FFD2031, mem on start (mb), free: 118, total: 1191, max: 1191
2015-08-11 04:11:02,409 ERROR [main] cascading.flow.stream.element.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.tuple.TupleException: failed to set a value beyond the end of the tuple elements array, size: 567 , index: -1
	at cascading.tuple.Tuple.internalSet(Tuple.java:638)
	at cascading.tuple.Tuple.set(Tuple.java:535)
	at cascading.tuple.Tuple.nulledCopy(Tuple.java:731)
	at cascading.tuple.Tuples.nulledCopy(Tuples.java:261)
	at cascading.flow.stream.element.GroupingSpliceGate$5.makeResult(GroupingSpliceGate.java:156)
	at cascading.flow.hadoop.stream.HadoopGroupGate.receive(HadoopGroupGate.java:97)
	at cascading.flow.hadoop.stream.HadoopGroupGate.receive(HadoopGroupGate.java:45)
	at cascading.flow.stream.element.FunctionEachStage$1.collect(FunctionEachStage.java:81)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
	at com.test.DistributedCrossProduct$RemoveFieldnames.operate(DistributedCrossProduct.java:136)


Thanks
Dilip

...

Ken Krugler

unread,
Aug 11, 2015, 2:56:01 PM8/11/15
to cascadi...@googlegroups.com


From: Dilip K

Sent: August 11, 2015 10:20:46am PDT

To: cascading-user

Subject: Re: run cascading job with multiple reducers


That was so quick. Thanks for keeping me posted on this.
I tried using the snippet DistributedCrossProduct.java

It is working as expected using LocalFlowConnector, but not with either MR2 or TEZ connectors.

And you've confirmed that you're using cascading 2.7.1-wip-33 (or later), as per my email?

-- Ken


For more options, visit https://groups.google.com/d/optout.

Dilip K

unread,
Aug 11, 2015, 3:17:13 PM8/11/15
to cascading-user
Yes, I am using Cascading version 3.0.2-wip-142.
...

Andre Kelpe

unread,
Aug 11, 2015, 3:36:11 PM8/11/15
to cascading-user
I don't think the fix has been forward ported to wip-3.0.2 yet.

- André


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel

unread,
Aug 11, 2015, 3:37:44 PM8/11/15
to cascadi...@googlegroups.com
I have not merged 2.7.1-wip-33 changes into 3.0.2-wip yet. this only works with 2.7.1-wip-33.

once the fixed is confirmed, we will cascade a 2.7.1 final -> 3.0.2-wip -> 3.1-wip to sync everything up.

ckw


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Dilip K

unread,
Aug 11, 2015, 8:11:42 PM8/11/15
to cascading-user
I have switched back to Cascading 2.7.1-wip-33. This resolved the issue with Fields(-1).
I am circled back to the number of reducers issue, still the flow running with single reducer. Please advise.


Here is my code.
// define source Tap of type CSV
Scheme sourceScheme = new TextDelimited(new Fields("employee-record"), true, "\n");
Tap source = new Hfs(sourceScheme, inPath);

// define destination Tap
Scheme sinkScheme = new TextDelimited(Fields.ALL, ",");
Tap sink = new Hfs(sinkScheme, outPath, SinkMode.REPLACE);

// the 'head' of the pipe assembly
Pipe readCsvAssembly = new Pipe("read-user-features");

// Retrieve each row
Function parseCsv = new ParseCsvLine();
readCsvAssembly = new Each(readCsvAssembly, new Fields("employee-record"), parseCsv, Fields.RESULTS);

// Retrieve all fields
Function parseCsvFields = new ParseFeaturesCsvFields(getCsvFields(csvColumnCount));
readCsvAssembly = new Each(readCsvAssembly, new Fields("user-record"), parseCsvFields, Fields.RESULTS);

// Grouping to spawn multiple reducers
readCsvAssembly = new GroupBy(readCsvAssembly, new Fields(1)); // Fields.ALL

// perform the cross join
// Pipe distancePipe = new DistributedCrossProduct(readCsvAssembly, 15);
// distancePipe = new Each(distancePipe, new Debug("joined",  true));


// Write each user to Hbase using User Key and distances as values
properties.put("mapreduce.job.reduces", 15);
AppProps.setApplicationJarClass( finalProperties, Main.class );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);
Flow persistUserDistancesFlow = flowConnector1.connect( source, sink, readCsvAssembly );
persistUserDistancesFlow.complete();


Thanks
Dilip Kari

...

Ken Krugler

unread,
Aug 11, 2015, 8:20:10 PM8/11/15
to cascadi...@googlegroups.com
Hi Dilip,

What is your understanding of what this line of code will do...

readCsvAssembly = new GroupBy(readCsvAssembly, new Fields(1)); // Fields.ALL

Regards,

-- Ken


From: Dilip K

Sent: August 11, 2015 5:11:42pm PDT

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.
--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip Kumar Kari

unread,
Aug 11, 2015, 8:41:19 PM8/11/15
to cascadi...@googlegroups.com
This would group the tuples based on second field, there by spin number of reducers equal to number of groups. I am using it to test the basic flow with multiple reducers and make sure configuration is not an issue. Without cross join it self is running 
with single redeuce task. I remember the same code with TEZ was running with multiple reducers.

Please correct me if I am wrong.
You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/OaOByGY_bRE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

Ken Krugler

unread,
Aug 12, 2015, 9:50:35 AM8/12/15
to cascadi...@googlegroups.com


From: Dilip Kumar Kari

Sent: August 11, 2015 5:41:17pm PDT

To: cascadi...@googlegroups.com

Subject: Re: run cascading job with multiple reducers


This would group the tuples based on second field, there by spin number of reducers equal to number of groups.

It wouldn't spin up that many reducers - it would partition the groups into the configured reducers using the hash of the field value.

But yes, if you were sure there were enough unique values for the second field (and thus enough groups) then this would be partitioned such that every reducer was active for some amount of time.

I am using it to test the basic flow with multiple reducers and make sure configuration is not an issue. Without cross join it self is running 
with single redeuce task. I remember the same code with TEZ was running with multiple reducers.

At this point I'd use the Hadoop GUI to look at your actual job conf to make sure the relevant properties are set correctly given the version of Hadoop you're using on the cluster, and that you have sufficient groups.

I did notice that you're setting finalProperties in your code, but using "properties" when connecting the flow.

Good luck,

-- Ken


For more options, visit https://groups.google.com/d/optout.

Dilip K

unread,
Aug 12, 2015, 11:03:17 AM8/12/15
to cascading-user
Hi Ken,

Thanks for all the clarifications. I figured out the issue, it's my mistake.
properties.put("mapreduce.job.reduces", 15);
Property value should be a String rather than int. Changed the above line to properties.put("mapreduce.job.reduces", Integer.toString( 15 ));
Then the job running with 15 reducers.

Will run the job with cross join and let you know the status.



Thanks
Dilip Kari
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-user+unsubscribe@googlegroups.com.
To post to this group, send email to cascading-user@googlegroups.com.

...

Andre Kelpe

unread,
Aug 12, 2015, 11:09:05 AM8/12/15
to cascading-user
This is a classic error b/c the Properties object is of type Map<Object, Object>, but ignores everything that isn't a String. Retrofitting generics ftw! I have ran into this myself and it is super easy to fix, but takes time to figure out...

- André

To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.

...

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Dilip K

unread,
Aug 12, 2015, 5:35:27 PM8/12/15
to cascading-user

I did  basic testing on the new enhancement made for Cartesian Join. Everything is working great.

One minor issue that I see is, InsertRandom(numReducers) being used in DistributedCrossProduct is generating the arbitrary values. Due to which some of the reducers are not getting any tuples and some are getting few tuples, such reducers are completing far ahead of other reducers. Equally distributing tuples across all reducers makes much sense.

 

Thanks

Dilip  

...

Ken Krugler

unread,
Aug 12, 2015, 6:58:23 PM8/12/15
to cascadi...@googlegroups.com
If you have any reasonable number of tuples, then the distribution of values should be roughly equivalent.

So I have to assume you're seeing this with a test case where you only have a very few number of values, yes?

Leaving that aside, maintaining state and having it cycle though values would remove this issue, yes.

-- Ken


From: Dilip K

Sent: August 12, 2015 2:35:27pm PDT

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.
--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Dilip K

unread,
Aug 12, 2015, 7:35:22 PM8/12/15
to cascading-user
Ken,

This is happening even on large number of tuples. I tried with both 15 tuples and 3000 tuples.
In case of 3000, configured for 15 reducers. its creating 15 reducers, but 7 always getting 0 tuples, 4 of them finished when remaining 4 are at 30%. 

[dkari@hdpe001 ~]$ hadoop fs -ls /tmp/hadoop-dkari/nd_output
Found 16 items
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:33 /tmp/hadoop-dkari/nd_output/_SUCCESS
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00000
-rw-r--r--   3 dkari hdfs  738831260 2015-08-12 10:30 /tmp/hadoop-dkari/nd_output/part-00001
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00002
-rw-r--r--   3 dkari hdfs 1522189266 2015-08-12 10:31 /tmp/hadoop-dkari/nd_output/part-00003
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00004
-rw-r--r--   3 dkari hdfs 3076367232 2015-08-12 10:33 /tmp/hadoop-dkari/nd_output/part-00005
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00006
-rw-r--r--   3 dkari hdfs 2240010960 2015-08-12 10:32 /tmp/hadoop-dkari/nd_output/part-00007
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00008
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00009
-rw-r--r--   3 dkari hdfs  860367846 2015-08-12 10:30 /tmp/hadoop-dkari/nd_output/part-00010
-rw-r--r--   3 dkari hdfs 1360468596 2015-08-12 10:31 /tmp/hadoop-dkari/nd_output/part-00011
-rw-r--r--   3 dkari hdfs  799276638 2015-08-12 10:30 /tmp/hadoop-dkari/nd_output/part-00012
-rw-r--r--   3 dkari hdfs  743276364 2015-08-12 10:30 /tmp/hadoop-dkari/nd_output/part-00013
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:29 /tmp/hadoop-dkari/nd_output/part-00014


[dkari@hdpe001 ~]$ hadoop fs -ls /tmp/hadoop-dkari/nd_output
Found 16 items
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:44 /tmp/hadoop-dkari/nd_output/_SUCCESS
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00000
-rw-r--r--   3 dkari hdfs  708258932 2015-08-12 10:41 /tmp/hadoop-dkari/nd_output/part-00001
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00002
-rw-r--r--   3 dkari hdfs 1481461794 2015-08-12 10:42 /tmp/hadoop-dkari/nd_output/part-00003
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00004
-rw-r--r--   3 dkari hdfs 3116956630 2015-08-12 10:44 /tmp/hadoop-dkari/nd_output/part-00005
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00006
-rw-r--r--   3 dkari hdfs 2341856364 2015-08-12 10:43 /tmp/hadoop-dkari/nd_output/part-00007
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00008
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00009
-rw-r--r--   3 dkari hdfs  758549166 2015-08-12 10:41 /tmp/hadoop-dkari/nd_output/part-00010
-rw-r--r--   3 dkari hdfs 1477662520 2015-08-12 10:42 /tmp/hadoop-dkari/nd_output/part-00011
-rw-r--r--   3 dkari hdfs  738185430 2015-08-12 10:41 /tmp/hadoop-dkari/nd_output/part-00012
-rw-r--r--   3 dkari hdfs  717821694 2015-08-12 10:41 /tmp/hadoop-dkari/nd_output/part-00013
-rw-r--r--   3 dkari hdfs          0 2015-08-12 10:40 /tmp/hadoop-dkari/nd_output/part-00014



Thanks
Dilip
...
Reply all
Reply to author
Forward
0 new messages