I would like to ask if there is available some documentation or at least example of how to submit pyspark job from luigi?
I went through the examples and I can only see how to submit Spark job using .jar. I'd assume that I should just replace the .jar by my python file that I'd like to submit, but what about if I'd like to use option --py-files?
Thank you in advance for any help.
Best regards,
Jan
--
You received this message because you are subscribed to a topic in the Google Groups "Luigi" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/luigi-user/SbTe90ZlPoE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to luigi-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
spark@spark-1:~$ cat MyTaskPySpark.py
import sys
import luigi
from luigi.contrib.spark import PySparkTask
print(sys.version)
class MyTaskPySpark(PySparkTask):
def main(self, sc):
#sc.addPyFile(mojskrypt.py)
sc.addPyFile("/mnt/spark/mytask.py")
x = sc.parallelize(['a','b','c','a','b'])
x = x.map(lambda s: (s, 1)).reduceByKey(lambda a, b: a + b)
print x.collect()
if __name__ == "__main__":
luigi.run()
spark@spark-1:~$ cat client.cfg
[spark]
spark-submit: /mnt/spark/spark/bin/spark-submit
master: spark://spark-1:7077
py-files: ["MyTaskPySpark.py"]
spark@spark-1:~$ python MyTaskPySpark.py MyTaskPySpark --local-scheduler
2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2]
DEBUG: Checking if MyTaskPySpark() is complete
/usr/local/lib/python2.7/dist-packages/luigi/worker.py:211: UserWarning: Task MyTaskPySpark() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Scheduled MyTaskPySpark() (PENDING)
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 23193] Worker Worker(salt=993230963, workers=1, host=spark-1, username=spark, pid=23193) running MyTaskPySpark()
INFO: Running: ['/mnt/spark/spark/bin/spark-submit', '--master', 'spark://spark-1:7077', '--deploy-mode', 'client', '--name', 'MyTaskPySpark', '--py-files', '["MyTaskPySpark.py"]', '/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py', '/tmp/MyTaskPySparkXfdkSG/MyTaskPySpark.pickle']
INFO: Spark job stdout:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 57, in <module>
PySparkRunner(*sys.argv[1:]).run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 43, in __init__
self.job = pickle.load(fd)
ImportError: No module named MyTaskPySpark
ERROR: [pid 23193] Worker Worker(salt=993230963, workers=1, host=spark-1, username=spark, pid=23193) failed MyTaskPySpark()
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 137, in run
new_deps = self._run_get_new_deps()
File "/usr/local/lib/python2.7/dist-packages/luigi/worker.py", line 88, in _run_get_new_deps
task_gen = self.task.run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/spark.py", line 335, in run
super(PySparkTask, self).run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/spark.py", line 245, in run
raise SparkJobError('Spark job failed {0}'.format(repr(args)), out=stdout, err=stderr)
SparkJobError: Spark job failed ['/mnt/spark/spark/bin/spark-submit', '--master', 'spark://spark-1:7077', '--deploy-mode', 'client', '--name', 'MyTaskPySpark', '--py-files', '["MyTaskPySpark.py"]', '/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py', '/tmp/MyTaskPySparkXfdkSG/MyTaskPySpark.pickle']
STDOUT: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 57, in <module>
PySparkRunner(*sys.argv[1:]).run()
File "/usr/local/lib/python2.7/dist-packages/luigi/contrib/pyspark_runner.py", line 43, in __init__
self.job = pickle.load(fd)
ImportError: No module named MyTaskPySpark
INFO: Skipping error email. Set `error-email` in the `core` section of the luigi config file to receive error emails.
DEBUG: 1 running tasks, waiting for next task to finish
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=993230963, workers=1, host=spark-1, username=spark, pid=23193) was stopped. Shutting down Keep-Alive thread
Any ideas ?