Error when saving pyspark.ml.Pipeline with fitted NNEstimator

52 views
Skip to first unread message

clar...@gmail.com

unread,
Sep 1, 2023, 11:18:57 AM9/1/23
to User Group for BigDL
After wrapping a neural network model as an NNEstimator, I would like to save the fitted pipeline with the fitted scalers and NNEstimator, however, when running on the cluster, I encounter some errors (attached at the end of this message - "no such method" error).

The pipeline itself works without error if I eliminate the procedure of saving pipeline.

Are there working code example that saves a pyspark.ml.Pipeline which contains fitted scalers and NNEstimator I could refer to? Thanks!

===============
Error from cluster
===============
An error occurred while calling o8205.save.
: java.lang.NoSuchMethodError: org.json4s.JsonDSL$.jobject2assoc(Lorg/json4s/JsonAST$JObject;)Lorg/json4s/JsonDSL$JsonListAssoc;
at com.intel.analytics.bigdl.dllib.nnframes.NNModel$.saveImpl(NNEstimator.scala:959)
at com.intel.analytics.bigdl.dllib.nnframes.NNModel$NNModelWriter.saveImpl(NNEstimator.scala:920)
at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)


Xin Qiu

unread,
Sep 4, 2023, 10:08:43 PM9/4/23
to User Group for BigDL
Could you provide your bigdl version, spark version, hadoop version, OS version and some sample codes to reporudce this error?

Bests,
-Xin

clar...@gmail.com

unread,
Sep 6, 2023, 4:06:51 PM9/6/23
to User Group for BigDL
Hi Xin,

Thanks for getting back to me. It happens that I can reproduce the error locally on my laptop, so I will share with you the code that I reproduce it:

The unit test I use:

from bigdl.dllib.nn.layer import * from bigdl.dllib.nn.criterion import * from bigdl.dllib.nnframes.nn_classifier import * from pyspark.ml import PipelineModel, Pipeline, Transformer import pyspark.sql.types as T def test_saving_nnestimator_in_pipeline(spark_session): data = spark_session.sparkContext.parallelize([ ((2.0, 1.0), (1.0, 2.0)), ((1.0, 2.0), (2.0, 1.0)), ((2.0, 1.0), (1.0, 2.0)), ((1.0, 2.0), (2.0, 1.0))]) schema = T.StructType([ T.StructField("features", T.ArrayType(T.DoubleType(), False), False), T.StructField("label", T.ArrayType(T.DoubleType(), False), False)]) df = spark_session.createDataFrame(data, schema) model = Sequential().add(Linear(2, 2)) criterion = MSECriterion() estimator = NNEstimator(model, criterion, SeqToTensor([2]), ArrayToTensor([2]))\ .setBatchSize(4).setLearningRate(0.2).setMaxEpoch(40) pipeline = Pipeline(stages=[estimator]) fitted_pipeline = pipeline.fit(df) res_from_pipeline = fitted_pipeline.transform(df) res_from_pipeline.show(truncate=False) """ +----------+----------+----------------------+ |features |label |prediction | +----------+----------+----------------------+ |[2.0, 1.0]|[1.0, 2.0]|[1.0044417, 1.9995196]| |[1.0, 2.0]|[2.0, 1.0]|[1.9992191, 1.0093627]| |[2.0, 1.0]|[1.0, 2.0]|[1.0044417, 1.9995196]| |[1.0, 2.0]|[2.0, 1.0]|[1.9992191, 1.0093627]| +----------+----------+----------------------+ """ fitted_pipeline.save("attempt_to_save_pipeline")

The spark_session is created via:

import os from pyspark.sql import SparkSession from bigdl.dllib.nncontext import init_nncontext def get_local_spark(): lib_path = os.path.abspath(os.path.dirname(__file__)) + '/../../lib/' jars_path = ','.join([ lib_path + 'spark-nlp-assembly-4.3.1.jar', lib_path + 'bigdl-assembly-spark_3.1.3-2.2.0-jar-with-dependencies.jar', lib_path + 'sqlite-jdbc-3.27.2.1.jar']) spark = SparkSession \ .builder \ .master("local[1]") \ .appName("pytest-unittests") \ .enableHiveSupport() \ .config("spark.jars", jars_path) \ .config("mapred.output.compress", "true") \ .config("hive.exec.compress.output", "true") \ .config("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \ .config("io.compression.codecs", "org.apache.hadoop.io.compress.GzipCodec") \ .config("hive.exec.dynamic.partition.mode", "nonstrict") \ .config("spark.sql.shuffle.partitions", "1") \ .config("spark.driver.host", "localhost") \ .config("spark.broadcast.compress", "false") \ .config("spark.shuffle.compress", "false") \ .config("spark.shuffle.spill.compress", "false") \ .config("spark.shuffle.reduceLocality.enabled", "false") \ .config("spark.shuffle.blockTransferService", "nio") \ .config("spark.scheduler.minRegisteredResourcesRatio", "1.0") \ .config("spark.speculation", "false") \ .config("spark.driver.memory", "12G") \ .config("spark.executor.memory", "6G") \ .config('spark.default.parallelism', 1) \ .config('spark.rdd.compress', "false") \ .config('spark.dynamicAllocation.enabled', "false") \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.hive.convertMetastoreInsertDir', "false") \ .config("spark.kryoserializer.buffer.max", "2000M") \ .config("spark.sql.catalogImplementation", "hive") \ .config("spark.hadoop.datanucleus.autoCreateSchema", "true") \ .config("spark.hadoop.datanucleus.fixedDatastore", "false") \ .config("spark.sql.warehouse.dir", os.getcwd()) \ .getOrCreate() # log_info('Execute bigdl.dllib.nncontext.init_nncontext().') init_nncontext(cluster_mode="local") return spark spark_session = get_local_spark()

Running test_saving_nnestimator_in_pipeline up to fitted_pipeline.save("attempt_to_save_pipeline") gives me the following error:

Traceback (most recent call last): File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3552, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "<ipython-input-5-5c55519259ec>", line 1, in <module> fitted_pipeline.save("attempt_to_save_pipeline") File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/util.py", line 226, in save self.write().save(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/util.py", line 122, in save self.saveImpl(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/pipeline.py", line 242, in saveImpl PipelineSharedReadWrite.saveImpl(self.instance, stages, self.sc, path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/pipeline.py", line 378, in saveImpl .getStagePath(stage.uid, index, len(stages), stagesDir)) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/bigdl/dllib/nnframes/nn_classifier.py", line 599, in save super(NNModelWriter, self).save(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/util.py", line 177, in save self._jwrite.save(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o949.save. : java.lang.NoSuchMethodError: org.json4s.JsonDSL$.jobject2assoc(Lorg/json4s/JsonAST$JObject;)Lorg/json4s/JsonDSL$JsonListAssoc; at com.intel.analytics.bigdl.dllib.nnframes.NNModel$.saveImpl(NNEstimator.scala:959) at com.intel.analytics.bigdl.dllib.nnframes.NNModel$NNModelWriter.saveImpl(NNEstimator.scala:920) at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)

Due to production requirement at work, I am running the code with the following (not fully consistentenvironment:

  • python 3.7
  • bigdl-spark3 (pyspark=3.1.3 and bigdl=2.2.0)
  • PySpark 3.2.1 (I first did "pip install bigdl-spark3" and then "pip install pyspark==3.2.1")
  • I'm running on MacBook Pro with M2 chip
Thanks for looking into this for me!

Best,

Clare

Xin Qiu

unread,
Sep 7, 2023, 8:44:05 PM9/7/23
to User Group for BigDL
I have reproduced your error, the spark version 3.2.1 is not compatiable. You can downgrade spark to 3.1.3 or waiting for your new release (After https://github.com/intel-analytics/BigDL/pull/8786  merged)

Bests,
-Xin

Reply all
Reply to author
Forward
0 new messages