Configuring parallelism on a per task basis for Tez

84 views
Skip to first unread message

Piyush Narang

unread,
Aug 2, 2016, 6:10:59 PM8/2/16
to cascadi...@googlegroups.com

Hi folks,

I was testing out some Scalding (on Cascading 3.2 using Tez) code that has a couple of group clauses and wasn't able to figure out how one could set different parallelism for different Tez vertices in Cascading.

Noticed that when we end up with a Tez graph, we always ended up with a parallelism of 1 on all the 'reducer' / scatter gather vertices. This seems to be due to us setting the cascading.flow.runtime.gather.partitions.num = 1 on our groupAll in Scalding.
This used to work fine in the MR setup as these steps end up being different jobs but in case of Tez, it seems like if you set a value like gather partitions it ends up being used for all scatter gather steps. 

Is there a recommended way to set parallelism for individual Tez vertices / Cascading nodes? Or do we need to use the Tez parallelism settings directly? Any other recommendations / suggestions?

Here’s the code at a high level:

myTypedPipe.flatMap( tweet => tweet.getText.split("\\s+") )
  .filter(_.contains("#"))
  .map(_.toLowerCase)
  .map(word => (word, 1L))
  .sumByKey // first groupBy, would like higher parallelism here
  .toTypedPipe 
  .aggregate(top10000) // groupAll, this can be parallelism of 1
  .flatten
  .write(TypedTsv("hashtag_output"))

Thanks,
 
--
- Piyush

Andre Kelpe

unread,
Aug 3, 2016, 6:25:40 AM8/3/16
to cascading-user
You can set a configuration option per node and/or step, by putting it
in the correct ConfigDef on the pipe level:

http://docs.cascading.org/cascading/3.1/javadoc/cascading-core/cascading/property/ConfigDef.html

http://docs.cascading.org/cascading/3.1/javadoc/cascading-core/cascading/pipe/Pipe.html#getConfigDef()
http://docs.cascading.org/cascading/3.1/javadoc/cascading-core/cascading/pipe/Pipe.html#hasNodeConfigDef()
http://docs.cascading.org/cascading/3.1/javadoc/cascading-core/cascading/pipe/Pipe.html#getStepConfigDef()

- André
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/CAOtuAjb37-Ojh3nKjpUR6J96V%3Deo-3z8Mz6zFaBZ2qt1t%2BkuCw%40mail.gmail.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com

Piyush Narang

unread,
Aug 3, 2016, 5:10:55 PM8/3/16
to cascading-user
Thanks for getting back André. I think we were setting this on the StepConfigDef instead of the NodeConfigDef. I guess since those settings are applied on the 'process step' that results in multiple Tez vertices with this same value. Shall try out the NodeConfigDef on a Tez run, based on the docs you linked it seems like it might do the trick. 

Do you know if using the NodeConfigDef to set the number of reducers would also be a good idea for Hadoop? If possible it would be nice to use the same code for Tez & Hadoop to set the no of reducers for the given group operation. 

Thanks,
Piyush

Andre Kelpe

unread,
Aug 4, 2016, 5:43:06 AM8/4/16
to cascading-user
It works the same way, so if you know, that a given number of reducers
for a given node is better than the global setting, you should set it.
ConfigDef is meant for those "expert-level" overrides.

- André

On Wed, Aug 3, 2016 at 11:10 PM, 'Piyush Narang' via cascading-user
> https://groups.google.com/d/msgid/cascading-user/67919341-252d-4879-bd09-491b3e754580%40googlegroups.com.

Piyush Narang

unread,
Aug 4, 2016, 6:01:53 PM8/4/16
to cascading-user
Thanks Andre. Looking at the code it seems like nodeConfigDef isn't being initialized in MR (but stepConfigDef is):
2) Init being done for stepConfigDef in HadoopFlowStep: https://github.com/cwensel/cascading/blob/wip-3.1/cascading-hadoop/src/main/shared-mr1/cascading/flow/hadoop/HadoopFlowStep.java#L124

Seems like for Hadoop then using just nodeConfigDef won't cut it right, we'll need to also set the stepConfigDef? (Might be missing something too, not super familiar with the code). 

Thanks,

Chris K Wensel

unread,
Aug 4, 2016, 7:18:52 PM8/4/16
to cascadi...@googlegroups.com
It is not possible to set Node level configurations in MapReduce since there really is only one JobConf for both the Map and Reduce. Set the step.

if someone wants to contrib a patch that converts our parallelism config key to the map or reduce equiv as seen in a current NodeConfigDef, that would be great.


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Piyush Narang

unread,
Aug 4, 2016, 7:28:23 PM8/4/16
to cascading-user
Thanks for confirming Chris. Just tried out a MR run (with parallelism set in nodeConfigDef) and that does confirm that it isn't picked up. I think we'll probably end up having a forked setup for now. I will discuss a bit more with some of the other Scalding devs as well. 

Thanks,
Reply all
Reply to author
Forward
0 new messages