Documentation or example of how to use pyspark from luigi

2,590 views
Skip to first unread message

zik...@gmail.com

unread,
Oct 19, 2014, 9:40:34 AM10/19/14
to luigi...@googlegroups.com
Hi,

I would like to ask if there is available some documentation or at least example of how to submit pyspark job from luigi?

I went through the examples and I can only see how to submit Spark job using .jar. I'd assume that I should just replace the .jar by my python file that I'd like to submit, but what about if I'd like to use option --py-files?

Thank you in advance for any help.

Best regards,
Jan

Sebastian Csar

unread,
Nov 25, 2014, 3:09:48 PM11/25/14
to luigi...@googlegroups.com, zik...@gmail.com
I don't think there's anything particular to Luigi about using pyspark versus running some other python that wants to make use of pyspark. It did, however, take me a while to figure out. I do the following:
files = [foo.py, bar.py]
sc = SparkContext("master info", "App Name", pyFiles=files)
where "master info" is whatever you pass in the command line with --master.

jac...@skimlinks.com

unread,
Jan 8, 2015, 1:25:54 PM1/8/15
to luigi...@googlegroups.com, zik...@gmail.com
I am also looking for something like:
> spark-submit myscript.py --py-files mylib.zip
unfortunately this seems not to work
when a slave runs the line
"import mylib"
it stops with and error:
ImportError: No module named mylib

zik...@gmail.com

unread,
Jan 8, 2015, 4:22:27 PM1/8/15
to luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com
Actually I've contributed to luigi by this https://github.com/spotify/luigi/pull/512. Particularly it's PySpark1xJob class in luigi/contrib/spark.py.
It's one possible way of how to submit the PySpark jobs to luigi. But it seems that it's still available only in the version that is on github and not in the version that you can install via pip.

Reading the Sebastian's suggestion I must say that I like it more than using the PySpark1xJob, but I've not tried it to run the PySpark job his way yet.
Message has been deleted

nguye...@gmail.com

unread,
Mar 5, 2015, 4:23:01 PM3/5/15
to luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com
Could you post an example of how to implement PySpark1xJob in Luigi, I am currently trying to implement luigi to run my spark jobs but do not know how to syntactically. I've read the api doc, an example implementation would help me great. Thank you!
Message has been deleted

zik...@gmail.com

unread,
Mar 5, 2015, 4:28:41 PM3/5/15
to luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com, nguye...@gmail.com
Hi,
Here comes some example with brief explanation, if I'll have time I'll also try to PR some example directly to luigi, but currently I have kind of busy days.

class MyPySparkJob(PySpark1xJob):

data_path = luigi.Parameter()
output_path = luigi.Parameter()

def requires(self):
# standard requirments in luigi
return [S3PathTask(self.data_path)]

def program(self):
# name/path of the spark program
return 'spark_program.py'

def py_files(self):
# zipped dependencies (libraries not installed via pip on the whole cluster)
# does not have to be implemented -> no files are included
return ['my_project.zip']

def job_args(self):
# arguments that you'd like to pass to your spark_program.py
# does not have to be implemented -> no args are included
return [a1, a2]

def output(self):
# your output dependencies
return [S3FlagTarget(self.output_path)]

Thierry Jossermoz

unread,
Mar 5, 2015, 9:17:48 PM3/5/15
to luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com, nguye...@gmail.com
Also note that in the upcoming luigi release (1.1.0), PySpark1xJob, Spark1xJob and SparkJob are deprecated in favour of SparkSubmitTask that is not tightly coupled to Yarn and also supports local, standalone and mesos clusters.

The PR is not merged yet so let me know if you have any comments: https://github.com/spotify/luigi/pull/812

A python example for SparkSubmitTask can be found here: https://github.com/jthi3rry/luigi/blob/master/examples/pyspark_wc.py

nguye...@gmail.com

unread,
Mar 6, 2015, 12:26:41 PM3/6/15
to luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com, nguye...@gmail.com
Thank you both for the help and code example, it works now :-).

Thierry - I would like to also notify you of a small bug in your code. I noticed you use sys.argv[0] and sys.argv[1] for the Spark code but it should be instead sys.arg[1] and sys.argv[2] - since sys.argv[0] is the filename.

Cheers to you guys!

Thierry Jossermoz

unread,
Mar 6, 2015, 5:54:31 PM3/6/15
to nguye...@gmail.com, luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com
Thanks Cory, I fixed the example


--
You received this message because you are subscribed to a topic in the Google Groups "Luigi" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/luigi-user/SbTe90ZlPoE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

nguye...@gmail.com

unread,
Mar 23, 2015, 3:48:29 AM3/23/15
to luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com, nguye...@gmail.com
Theirry,

With the new Spark module, I am having trouble getting luigi to pass on my dependency file onto my spark job:

class MySparkJob(SparkSubmitTask):

name = luigi.Parameter()
app = luigi.Parameter()

input_path = luigi.Parameter()
output_path = luigi.Parameter()

#tried this way
py_files = 'deps.zip'

def app_options(self):
# These are passed to the Spark main args in the defined order.
return [self.input().path, self.output().path]

#also tried this way
def py_files(self):
return ['deps.zip']

def input(self):
return HdfsTarget(self.input_path)

def output(self):
return HdfsTarget(self.local_output_path)

but when I run the luigi script the verbose out put doesn't seem to pass the dependency files at all:

INFO: Running: ['/home/hadoop/spark/bin/spark-submit', '--master', 'yarn', '--name', 'PySpark Luigi Test', 'my_spark_script', 'my_input_location', 'my_output_location']

import tldextract #remember to ship it with dependency in cluster environment
ImportError: No module named tldextract

Thanks for your help, Cheers!
Cory

Thierry Jossermoz

unread,
Mar 23, 2015, 3:41:57 PM3/23/15
to nguye...@gmail.com, luigi...@googlegroups.com, zik...@gmail.com, jac...@skimlinks.com
Hi Cory,

Py_files is a property that needs to be a list or tuple. So based on your two variants, it should work if you try:

py_files = ["deps.zip"]

Or

@property
def py_files(self):
return ["deps.zip"]

Let me know if it doesn't :-)

Cheers,
Thierry

mac...@brynski.pl

unread,
Sep 1, 2015, 11:12:55 AM9/1/15
to Luigi, zik...@gmail.com, jac...@skimlinks.com, nguye...@gmail.com
Hi,
I have the same problem.

spark@spark-1:~$ cat MyTaskPySpark.py
import sys
import luigi
from luigi.contrib.spark import PySparkTask


print(sys.version)

class MyTaskPySpark(PySparkTask):
def main(self, sc):
#sc.addPyFile(mojskrypt.py)
sc.addPyFile("/mnt/spark/mytask.py")
x = sc.parallelize(['a','b','c','a','b'])
x = x.map(lambda s: (s, 1)).reduceByKey(lambda a, b: a + b)
print x.collect()

if __name__ == "__main__":
luigi.run()

spark@spark-1:~$ cat client.cfg
[spark]
spark-submit: /mnt/spark/spark/bin/spark-submit
master: spark://spark-1:7077
py-files: ["MyTaskPySpark.py"]

spark@spark-1:~$ python MyTaskPySpark.py MyTaskPySpark --local-scheduler
2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2]
DEBUG: Checking if MyTaskPySpark() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:211: UserWarning: Task MyTaskPySpark() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Scheduled MyTaskPySpark() (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 23193] Worker Worker(salt=993230963, workers=1, host=spark-1, username=spark, pid=23193) running MyTaskPySpark()
INFO: Running: ['/mnt/spark/spark/bin/spark-submit', '--master', 'spark://spark-1:7077', '--deploy-mode', 'client', '--name', 'MyTaskPySpark', '--py-files', '["MyTaskPySpark.py"]', '/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py', '/tmp/MyTaskPySparkXfdkSG/MyTaskPySpark.pickle']
INFO: Spark job stdout:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 57, in <module>
PySparkRunner(*sys.argv[1:]).run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 43, in __init__
self.job = pickle.load(fd)
ImportError: No module named MyTaskPySpark

ERROR: [pid 23193] Worker Worker(salt=993230963, workers=1, host=spark-1, username=spark, pid=23193) failed MyTaskPySpark()
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 137, in run
new_deps = self._run_get_new_deps()
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 88, in _run_get_new_deps
task_gen = self.task.run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/spark.py", line 335, in run
super(PySparkTask, self).run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/spark.py", line 245, in run
raise SparkJobError('Spark job failed {0}'.format(repr(args)), out=stdout, err=stderr)
SparkJobError: Spark job failed ['/mnt/spark/spark/bin/spark-submit', '--master', 'spark://spark-1:7077', '--deploy-mode', 'client', '--name', 'MyTaskPySpark', '--py-files', '["MyTaskPySpark.py"]', '/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py', '/tmp/MyTaskPySparkXfdkSG/MyTaskPySpark.pickle']
STDOUT: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 57, in <module>
PySparkRunner(*sys.argv[1:]).run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 43, in __init__
self.job = pickle.load(fd)
ImportError: No module named MyTaskPySpark

INFO: Skipping error email. Set `error-email` in the `core` section of the luigi config file to receive error emails.
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=993230963, workers=1, host=spark-1, username=spark, pid=23193) was stopped. Shutting down Keep-Alive thread


Any ideas ?

Noah Maze

unread,
May 12, 2016, 12:35:49 PM5/12/16
to Luigi, zik...@gmail.com, jac...@skimlinks.com, nguye...@gmail.com, mac...@brynski.pl
I'm having the same problem, but it goes away if I run my task from a directory in the python path.  I tried to fix it by adding the file to '--python-files' but it didn't help.  
Reply all
Reply to author
Forward
Message has been deleted
0 new messages