I am working on a projet of binary obfuscation of some Pyspark code with Cython. I have a problem with the spark workers accessing the Cython dependencies.
I have tried to simplify the projet. Here is the structure of the code:
├── main.py
├── modules
│ ├── __init__.py
│ └── toolbox.pyx
├── pyspark_script.pyx
└── setup.py
from modules.toolbox import add_one
from modules import toolbox
from modules import product
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
x = 1
print(f"{x} turns into {toolbox.add_one(x)}")
sc = SparkContext(appName="TestModules")
res_rdd = sc.parallelize(range(0, 10), 3).map(add_one))
print(f"result: {res_rdd.collect()}")
My problem is the following:
I cythonnize my spark code and my toolbox module with the following setup.py file:
from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext
ext_modules = [
Extension("pyspark_script", ["pyspark_script.pyx"]),
Extension("modules.toolbox", ["modules/toolbox.pyx"]),
]
setup(name="Sample Program", cmdclass={"build_ext": build_ext}, ext_modules=ext_modules)
I compile the dependencies with:
$ python setup.py build_ext --inplace
I submit it to spark with the following command:
I have an error: the workers can't find the module:
21/09/22 18:34:21 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 4) (127.0.0.1 executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/usr/local/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/usr/local/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 160, in _read_with_length
return self.loads(obj)
File "/usr/local/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 430, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'modules' at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
...
I found a hack to circumvent this problem : I remove the reference to the module path in my setup file. This means that line 6 which started with Extension("modules.toolbox", ... becomes Extension("toolbox", ....
Toolbox .so file will then be available at the root of the project and I move it manually in the modules directory:
Then the workers do find the toolbox module and I have the right outcome:
1 turns into 2
(...)
result [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] t
(...)
So, my questions:
why the worker can't find my toolbox modules if I mention the modules in the Extension() Class of the setup file as "modules.toolbox" - ie Extension("modules.toolbox, ["modules/toolbox.pyx"])?
But also why is it working when I write Extension("toolbox", ["modules/toolbox.pyx"])"?
Finally why does the master find the module in both cases and not the workers?
More generally, I would be super appreciative if anyone has some information about how the master and the workers are looking for dependencies and they differ from each other.
Thank you!
Godefroy