Controlling Spark using Luigi?

810 views
Skip to first unread message

staffan....@gmail.com

unread,
Feb 25, 2015, 4:38:31 AM2/25/15
to luigi...@googlegroups.com
Hi,
I'm wondering if it's be possible to use Luigi as a workflow tool to control several Spark jobs. My application is currently running first a big Spark job over a complete dataset and which creates several RDD's (lets say a few thousand). Then I use concurrent Scala code to run new jobs on each of the thousands of RDD's that I got in the first step.

Could I use Luigi for this instead? Would that mean that I would have to write the initial job to disk and then read them in again or can you pass RDDs between different steps in Luigi? The later jobs are totally independent and should thus be run concurrently, meaning that all of them must share the same SparkContext (but just start new jobs in separate threads). Is this possible as well?

I've tried to find anyone doing this before, but I haven't. Is there any information about the subject that I can read?

Best,
Staffan

Ron Reiter

unread,
Feb 25, 2015, 4:58:40 AM2/25/15
to staffan....@gmail.com, luigi...@googlegroups.com
I'll let Erik answer this one, but IMHO you can do two things:

1) Run Luigi as a separate Python process which runs a Spark Python job, and then the targets would be files
2) Run Luigi on the cluster using the Spark Python interpreter, using the spark context wherever you need it, and create targets or task outputs which are actually RDDs

Both should work I guess.

Thanks,

Ron


Facebook Twitter LinkedIn



--
You received this message because you are subscribed to the Google Groups "Luigi" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

neha...@gmail.com

unread,
May 11, 2016, 3:42:44 PM5/11/16
to Luigi, staffan....@gmail.com
Hey all, jumping in a bit late on this thread, but wanted to know if there was any follow up discussion or examples on option 2? That's a workflow we're interested in establishing and it would be great to seem some more discussion and examples on implementing.

Thanks!

On Wednesday, February 25, 2015 at 1:58:40 AM UTC-8, Ron Reiter wrote:
> I'll let Erik answer this one, but IMHO you can do two things:
>
>
> 1) Run Luigi as a separate Python process which runs a Spark Python job, and then the targets would be files
> 2) Run Luigi on the cluster using the Spark Python interpreter, using the spark context wherever you need it, and create targets or task outputs which are actually RDDs
>
>
> Both should work I guess.
>
>
>
>
>
>
>
>
>
> Thanks,
> Ron
>
>
>
>
>

Will Jones

unread,
May 13, 2016, 11:06:35 AM5/13/16
to Luigi, staffan....@gmail.com, neha...@gmail.com
I use spark with Luigi quit a bit, but haven't run into a scenario where I'd need to share a spark context between Luigi tasks, so I don't really understand your use-case to well, but to guess...  Is that so you can persist the RDD so it doesn't have to be loaded from storage for each of those thousands of jobs?

I might just continue to use scala in that case, and just use luigi to orchestrate the jobs.

neha...@gmail.com

unread,
May 13, 2016, 11:28:49 AM5/13/16
to Luigi, staffan....@gmail.com, neha...@gmail.com
Hi Will, that's correct. We generated various file outputs that are filtered subsets of output from a large spark job (as a PySpark DataFrame). We'd like to reuse that object in memory instead of having to write and read for each job (thus the need to pass around the SparkContext).

> I might just continue to use scala in that case, and just use luigi to orchestrate the jobs.

Could you expand more on how use of scala would allow for this type of workflow?

Thanks much!

Arash Rouhani Kalleh

unread,
May 15, 2016, 10:14:31 PM5/15/16
to neha...@gmail.com, Luigi, staffan....@gmail.com
I think Will means that you can do less intermediate steps. For example merge 3 LuigiSparkTasks into 1 LuigiSparkTask.
Message has been deleted

Will Jones

unread,
May 16, 2016, 12:51:02 AM5/16/16
to Luigi, staffan....@gmail.com, neha...@gmail.com
Arash nailed it!

Consider just consolidating everything into one task, instead of breaking them apart into separate tasks for each 
rdd.  There might be a little more legwork involved in making sure all intermediate steps are idempotent.. but luigi targets can 
be used to make that easier, e.g.

for path in all_paths:
   target =  S3FlagTarget(path)
   if target.exists():
       run_child_job(path)


Based on your posts it sounds like you already have the job written in scala?  If so, there might not be much to gain rewriting in 
python.  Luigi can orchestrate scala spark jobs just as well.

- Will

Arash Rouhani Kalleh

unread,
May 16, 2016, 3:47:04 AM5/16/16
to Will Jones, Luigi, Staffan Arvidsson, Nicholaus Halecky
A side note how I like to use luigi with external tools (like Scala Spark):

Personally I much prefer using Sparks Scala API. It is well-typed (yet not verbose), probably faster and there's probably more relevant stack overflow hits when you search for help. Also the IDE support is good (I use eclipse with Scala). It's also a good excuse to use Scala at work hours. :)

I like to think that luigi should orchestrate a Hive/Crunch/Scala/whatever-job. That job should preferably be a pure math-like operation, for example "Given TrackListens and UserInfos, create Australia's top 50 tracks list" (2 inputs 1 output). Then the issues of date-algebra, atomic moves, backfilling settings (Range logic), always-use-latest-userinfo-table logic, path-parameter logic, datasource logic (using hdfs and gfs simultaneously), dependency management and all that plumbing you don't want to mix in with the logic of the transformation can be done in luigi land.

Cheers,
Arash
 
Reply all
Reply to author
Forward
0 new messages