The unit test I use:
from bigdl.dllib.nn.layer import * from bigdl.dllib.nn.criterion import * from bigdl.dllib.nnframes.nn_classifier import * from pyspark.ml import PipelineModel, Pipeline, Transformer import pyspark.sql.types as T def test_saving_nnestimator_in_pipeline(spark_session): data = spark_session.sparkContext.parallelize([ ((2.0, 1.0), (1.0, 2.0)), ((1.0, 2.0), (2.0, 1.0)), ((2.0, 1.0), (1.0, 2.0)), ((1.0, 2.0), (2.0, 1.0))]) schema = T.StructType([ T.StructField("features", T.ArrayType(T.DoubleType(), False), False), T.StructField("label", T.ArrayType(T.DoubleType(), False), False)]) df = spark_session.createDataFrame(data, schema) model = Sequential().add(Linear(2, 2)) criterion = MSECriterion() estimator = NNEstimator(model, criterion, SeqToTensor([2]), ArrayToTensor([2]))\ .setBatchSize(4).setLearningRate(0.2).setMaxEpoch(40) pipeline = Pipeline(stages=[estimator]) fitted_pipeline = pipeline.fit(df) res_from_pipeline = fitted_pipeline.transform(df) res_from_pipeline.show(truncate=False) """ +----------+----------+----------------------+ |features |label |prediction | +----------+----------+----------------------+ |[2.0, 1.0]|[1.0, 2.0]|[1.0044417, 1.9995196]| |[1.0, 2.0]|[2.0, 1.0]|[1.9992191, 1.0093627]| |[2.0, 1.0]|[1.0, 2.0]|[1.0044417, 1.9995196]| |[1.0, 2.0]|[2.0, 1.0]|[1.9992191, 1.0093627]| +----------+----------+----------------------+ """ fitted_pipeline.save("attempt_to_save_pipeline")The spark_session is created via:
import os from pyspark.sql import SparkSession from bigdl.dllib.nncontext import init_nncontext def get_local_spark(): lib_path = os.path.abspath(os.path.dirname(__file__)) + '/../../lib/' jars_path = ','.join([ lib_path + 'spark-nlp-assembly-4.3.1.jar', lib_path + 'bigdl-assembly-spark_3.1.3-2.2.0-jar-with-dependencies.jar', lib_path + 'sqlite-jdbc-3.27.2.1.jar']) spark = SparkSession \ .builder \ .master("local[1]") \ .appName("pytest-unittests") \ .enableHiveSupport() \ .config("spark.jars", jars_path) \ .config("mapred.output.compress", "true") \ .config("hive.exec.compress.output", "true") \ .config("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") \ .config("io.compression.codecs", "org.apache.hadoop.io.compress.GzipCodec") \ .config("hive.exec.dynamic.partition.mode", "nonstrict") \ .config("spark.sql.shuffle.partitions", "1") \ .config("spark.driver.host", "localhost") \ .config("spark.broadcast.compress", "false") \ .config("spark.shuffle.compress", "false") \ .config("spark.shuffle.spill.compress", "false") \ .config("spark.shuffle.reduceLocality.enabled", "false") \ .config("spark.shuffle.blockTransferService", "nio") \ .config("spark.scheduler.minRegisteredResourcesRatio", "1.0") \ .config("spark.speculation", "false") \ .config("spark.driver.memory", "12G") \ .config("spark.executor.memory", "6G") \ .config('spark.default.parallelism', 1) \ .config('spark.rdd.compress', "false") \ .config('spark.dynamicAllocation.enabled', "false") \ .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \ .config('spark.sql.hive.convertMetastoreInsertDir', "false") \ .config("spark.kryoserializer.buffer.max", "2000M") \ .config("spark.sql.catalogImplementation", "hive") \ .config("spark.hadoop.datanucleus.autoCreateSchema", "true") \ .config("spark.hadoop.datanucleus.fixedDatastore", "false") \ .config("spark.sql.warehouse.dir", os.getcwd()) \ .getOrCreate() # log_info('Execute bigdl.dllib.nncontext.init_nncontext().') init_nncontext(cluster_mode="local") return spark spark_session = get_local_spark()Running test_saving_nnestimator_in_pipeline up to fitted_pipeline.save("attempt_to_save_pipeline") gives me the following error:
Traceback (most recent call last): File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3552, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "<ipython-input-5-5c55519259ec>", line 1, in <module> fitted_pipeline.save("attempt_to_save_pipeline") File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/util.py", line 226, in save self.write().save(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/util.py", line 122, in save self.saveImpl(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/pipeline.py", line 242, in saveImpl PipelineSharedReadWrite.saveImpl(self.instance, stages, self.sc, path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/pipeline.py", line 378, in saveImpl .getStagePath(stage.uid, index, len(stages), stagesDir)) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/bigdl/dllib/nnframes/nn_classifier.py", line 599, in save super(NNModelWriter, self).save(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/ml/util.py", line 177, in save self._jwrite.save(path) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "$HOME/anaconda3/envs/cos_env/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o949.save. : java.lang.NoSuchMethodError: org.json4s.JsonDSL$.jobject2assoc(Lorg/json4s/JsonAST$JObject;)Lorg/json4s/JsonDSL$JsonListAssoc; at com.intel.analytics.bigdl.dllib.nnframes.NNModel$.saveImpl(NNEstimator.scala:959) at com.intel.analytics.bigdl.dllib.nnframes.NNModel$NNModelWriter.saveImpl(NNEstimator.scala:920) at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:168) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)Due to production requirement at work, I am running the code with the following (not fully consistent) environment: