[cascading3 branch] descriptions & parallelism

17 views
Skip to first unread message

Cyrille Chépélov

unread,
Oct 11, 2016, 1:42:57 PM10/11/16
to scaldi...@googlegroups.com
Hi,

I'm trying to tie a few loose ends in the way step descriptions (text typically passed via .withDescriptions(...)) and the desired level of parallelism (typically passed via .withReducers(N)) is pushed on the various fabrics.

Right now:
  • Most of the scalding code base either ignores the back-end (good) or assumes the world is either Local or HadoopFlow (which covers Hadoop 1.x and MR1). As a consequence, a couple things don't yet work smoothly on Tez and I assume on Flink.
  • the descriptions are entirely dropped if not running on Hadoop1 or MR1
  • .withReducers sets a hadoop-specific property (mapred.reduce.tasks) at RichPipe#L41
  • the Tez fabric ignores .withReducers; and there is no other conduit (for now) to set the number of desired parts on the sinks. As a consequence, you can't run a tez DAG with a large level of parallelism and a small (single) number of output files (e.g. stats leading to a result file of a couple dozen lines); you must pick one and set cascading.flow.runtime.gather.partitions.num. There are workarounds, but they're quite ugly.
  • there are a few instance of "flow match { case HadoopFlow => doSomething ; case _ => () }" scattered around the code
  • there's some heavily reflection-based code in Mode.scala which depends on jars not part of the scalding build process (and it's good that these jars stay out of the scalding-core build, e.g. Tez client libraries)
  • While it may be desirable to experiment with scalding-specific transform registries for cascading (e.g. to deal with the Merge-GroupBy structure, or to perform tests/assertions on the resulting flow graph), it would be impractical to perform the necessary fabric-specific adjustments in Mode.scala as it is.

I'm trying to find a way to extract away the MR-isms, and push it into fabric-specific code which can be called when appropriate.

Questions:

  1. Would it be appropriate to start having fabric-specific jars (scalding-fabric-hadoop, scalding-fabric-hadoop2-mr1, scalding-fabric-tez etc.), push the fabric-specific code from Mode.scala there ?

    (we'd keep only the single scalding fabric-related factory using reflection, with appropriate interfaces defined in scalding-core)

  2. Pushing the fabric-specific code into dedicated jars would probably have user-visible consequences, as we can't make scalding-core depend on scalding-fabric-hadoop (for back-compatibility) unless the fabric-factory interface go into another jar.

    From my point of view, I would find that intentionally slightly breaking the build once upon upgrade for the purpose of letting the world know that there are other fabrics than MR1 might be acceptable, and on the other hand I haven't used MR1 for over a year.

    Is this "slight" dependency breakage acceptable, or is it better to have scalding-core still imply the hadoop fabrics?

  3. Right now, scalding's internals sometimes use Hadoop (MR) specifics to carry various configuration values. Is it acceptable to (at least in the beginning) continue doing so, kindling asking the respective non-hadoop fabrics to pick these values up and convert to the relevant APIs?

  4. Is it okay to drop the @deprecated(..., "0.12.0") functions from Mode.scala if they are inconvenient to carry over in the process?

  5. Currently, Job.buildFlow returns Flow[_]. Is it okay to have it return Flow[_] with ScaldingFlowGoodies instead, ScaldingFlowGoodies being the provisional interface name where to move the old "flow match { case HadoopFlow => ... }" code?
Thanks in advance

    -- Cyrille

Piyush Narang

unread,
Oct 11, 2016, 8:23:47 PM10/11/16
to Cyrille Chépélov, scaldi...@googlegroups.com
We ran into similar problems while trying to set the number of reducers while testing out Cascading3 on Tez. We hacked around it temporarily but haven't yet cleaned up that code and put it out for review (we'll need to fork MR / Tez there as nodeConfigDef works for Tez but not Hadoop). Based on my understanding, so far we've tried to delegate as much of this to Cascading as we can but there seem to be a few places where we're doing some platform specific stuff in Scalding. Breaking up to create fabric-specific sub-modules seems like a nice idea to me. We might need to think through the right way to do this to ensure we don't break stuff. Would it make sense to spin up an issue and we can discuss on it?

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
- Piyush

Oscar Boykin

unread,
Oct 11, 2016, 9:30:04 PM10/11/16
to Piyush Narang, Cyrille Chépélov, scaldi...@googlegroups.com
Generally, I think this is a good idea also (separate modules for fabrics).

I agree that Mode and Job are a bit hairy in spots. I think we can remove some deprecated code if it makes life significantly easier, but source and binary compatibility should be kept as much as we can reasonably manage.

I would actually really rather `buildFlow` be private[scalding] but maybe that is too much. Making it return a subclass of Flow seems like a fine idea to me at the moment.

Breaking hadoop out of scalding-core seems pretty hard since `Source` has it baked in at a few spots. That said, the Source abstractions in scalding are not very great. If we could improve that (without removing support for the old stuff) it might be worth it. Many have complained about Source's design over the years, but we have not really had a full proposal that seems to address all the concerns.

The desire for jobs to all look the same across all fabrics make modularization a bit ugly.

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
- Piyush

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

Cyrille Chépélov

unread,
Oct 12, 2016, 5:39:23 AM10/12/16
to scaldi...@googlegroups.com
Oscar, Piyush,

thanks for the feedback!

At the moment, I'm not sure it's realistic to fully break the dependency to "hadoop" completely out of scalding-core. As an intermediate goal, I'd shoot for at least soft-removing the assumption that the processing is made on Hadoop, but the storage interface will pretty much remain HDFS for the time being (IOW, I'll leave Source essentially unchanged in scalding-core).

Meanwhile, I'm taking the messages here and on the gitter channel as positive towards the principle of scalding-$FABRIC sub-modules, and will start working on that in the background.

    -- Cyrille

Oscar Boykin

unread,
Oct 12, 2016, 5:11:39 PM10/12/16
to Cyrille Chépélov, scaldi...@googlegroups.com
sounds great!

Alex Levenson

unread,
Oct 14, 2016, 4:48:02 PM10/14/16
to Oscar Boykin, Cyrille Chépélov, scaldi...@googlegroups.com
This is a large enough change, that probably won't fit into a single PR, that it might merit some sort of design doc / written plan. That way we can come up with a plan and then start implementing it piece by piece across a few PRs. 

sounds great!

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
- Piyush
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Alex Levenson
@THISWILLWORK

Cyrille Chépélov

unread,
Nov 4, 2016, 2:43:06 PM11/4/16
to scaldi...@googlegroups.com
Hi there,

Some progress on the "separation of fabrics" project:

    TL;DR: I have a branch here https://github.com/cchepelov/scalding/tree/split-fabrics that is mostly working on Hadoop, Hadoop2-MR1 and Tez, … and baby steps on Flink.

The Good
  • scalding-core has dependencies on Hadoop for HDFS, but no longer has explicit dependencies on MAPREDUCE
  • One can switch between MAPREDUCE using the legacy hadoop1 API or MAPREDUCE using Cascading's hadoop2-mr1 fabric
  • Most tests run on all four available fabrics in addition to Local. That is: Legacy Hadoop, Hadoop2-MR1, Tez, and Flink.
  • Switching from a fabric to another is a matter of supplying the appropriate fabric jar (scalding-fabric-hadoop,  scalding-fabric-tez, etc.) in your assembly
  • Even the REPL seems to accept using a different fabric (!)
  • Having an explicit per-fabric bit of code within Scalding enables experimentation with more advanced things, such implementing scalding-specific Cascading Planner graph transforms, as Chris advises.
  • I think I didn't break any widely-used API at the source level.
The Bad
  • I think I didn't break any widely-used API at the source level, but I haven't (yet) checked if any damage control should/can be done
  • A few tests still break in Tez. This is on things that I've lived with for a long time, but fixing those should be easier and a higher priority now. For now it seems there are really two outstanding items left: 1. mapping .withReducers all the way down to the level of parallelism in the TezChild node in charge of performing that processing and 2. perhaps a planner bug, or perhaps a missing scalding-specific planner transform to handle jobs involving Sketched Joins (that's on cascading 3.2-wip-6)
  • Flink is not yet ready for prime time. At the moment, I'm building it using a local snapshot reflecting https://github.com/dataArtisans/cascading-flink/pull/70 — This is required as some of Cascading's internal interfaces changed a bit since 3.1.
    Some of the test are bound to fail for now, as cascading-flink cannot yet map some variants of hash joins (outer right hash joins, for instance).
  • Mode.scala is a mess and will need a little bit of clean-up
  • There are still a couple tests that are bound to fail
  • Any test that was doing pattern maching on the exact type of Mode (Test vs. Hadoop vs. Local vs. HadoopTest) will fail, and there is no solution
  • Tez and Flink tests seem quite slow. Not yet sure what's happening, it seems some of the code is simply waiting and waking up long after a given test job is complete.

The Ugly

  • Mode.scala is a mess and will really need a little bit of clean-up

  • we still need to compile scalding-core with a provided dependency to either cascading-hadoop or cascading-hadoop2-mr1. This is due to HadoopTap and friends (HDFS support). Ideally we could have a (perhaps hard?) dependency on cascading-hadoop2-io since everyone's using it (hadoop2-mr1, tez, flink), but we'd have to manage the case of cascading-hadoop (which brings almost identical copies but cannot, by trade, depend on cascading-hadoop2-io). Still slightly confused on the best course of action; I'd like things in scalding-core to actually not compile if they still accidentally depend on MAPREDUCE. I'm unsure it's achievable as it is.

  • I've tried to share the fabric-sensitive tests from scalding-core into a pool of tests that is shared and verified with all fabrics: this is scalding-core-fabric-tests

    Although Scalatest's internal discovery seems to be happy with running anything that looks like a test, the discovery module used by "sbt test" is different. It only looks at tests that are implemented within the current project, specifically ignoring tests inherited from dependencies.

    I failed to find a way to convince sbt to adopt scalatest's discovery pattern. As a result, I've moved the "shared" tests from scalding-core-fabric-tests into another subdirectory of src/, which is referenced by all four fabrics as their own. As a result, this code is compiled 4 times, and IntelliJ can be confused and refusing to step into that.

    If there is an sbt guru around willing to give me a hand on this, I'd be really grateful.

  • Making counter implementation dependent on the fabric required passing a class name into fabric-specific properties, then using reflection to instantiate them up.
  • The smart tricks needed to make JobTest work and mock out taps which can be LocalTap or HadoopTaps pretty much at will
  • I couldn't really wrap my head around enough of this without actually digging in, rather than planning/designing first. Some documentation and possibly a restart from scratch might be needed after all.

Things I'm inclined to kick to "later": can we also abstract out storage from "necessarily HDFS"? Is that something useful? 

On the other hand, as the (Storage Mode) x (Execution Mode) x (data Scheme) support matrix can be daunting, it can be useful to still make the assumption that everything is HDFS unless it's on LocalTaps which sometimes can be HadoopTap-and-a-wrapper or the other way around.

Next steps: incorporate feedback, clean up, fix outstanding issues in scalding-fabric-tez, (fix in flink in due time), keep current with the develop/cascading3 branches, then figure out how to mainstream that (probably, indeed, breaking up what can be broken up into individual PRs, but I'm afraid there will still be a big atomic change of something at one point).

For now that's just a branch, would it make sense to open an "RFC only" PR to enable the review tools?

    -- Cyrille

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

Oscar Boykin

unread,
Nov 4, 2016, 2:59:05 PM11/4/16
to Cyrille Chépélov, scaldi...@googlegroups.com
Thanks for taking on this project. I'm excited about it.

Can you go ahead and make a WIP PR so we can see what the diff looks like and start giving feedback?

I'll be reviewing the WIP PR carefully.

sounds great!

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
- Piyush
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.



--
Alex Levenson
@THISWILLWORK

Cyrille Chépélov

unread,
Nov 4, 2016, 3:57:09 PM11/4/16
to scaldi...@googlegroups.com
Thanks!

That's PR#1617 https://github.com/twitter/scalding/pull/1617

Of note is scalding-core-fabric-tests/src/fabric/scala/com/twitter/scalding/CoreTestFabric.scala  , which is really anything in scalding-core/src/test/scala/com/twitter/scalding/CoreTest.scala  that needed to .runHadoop

The real fun starts in Mode.scala, Source.scala and FileSource.scala
   
    -- Cyrille

Piyush Narang

unread,
Nov 4, 2016, 4:19:26 PM11/4/16
to Cyrille Chépélov, Scalding Development
This is pretty cool! Shall take a look at the WIP PR as well. 

sounds great!

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
- Piyush
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Alex Levenson
@THISWILLWORK
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
- Piyush
Reply all
Reply to author
Forward
0 new messages