Pyspark DataFrameWriter.save() Error

2,502 views
Skip to first unread message

Weng Shao Fong

unread,
Sep 26, 2016, 1:08:46 AM9/26/16
to mongodb-user
Hi, I'm new to MongoDB and Apache Spark. I try to execute the introduction.py but it keep pop out with the save() error.

Here is the command I wrote:
$ bin/spark-submit --master "local[4]" --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" --packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0 introduction.py

After that Spark is started and the this error is pop out:

Traceback (most recent call last):
  File "/home/wengshao/intro.py", line 44, in <module>
    characters.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 530, in save
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o57.save.
: java.lang.AbstractMethodError: com.mongodb.spark.sql.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:211)
        at java.lang.Thread.run(Thread.java:745)

Is that I missed out anything?

Ross Lawley

unread,
Sep 26, 2016, 6:05:28 AM9/26/16
to mongod...@googlegroups.com
Hi,

What version of Apache Spark are you running?

Ross

--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/c0ebeea2-d961-4889-a7b5-beda976ae466%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--


{ name     : "Ross Lawley",
  title    : "Senior Software Engineer",
  location : "London, UK",
  twitter  : ["@RossC0", "@MongoDB"],
  facebook :"MongoDB"}

Message has been deleted

Weng Shao Fong

unread,
Sep 27, 2016, 2:13:08 AM9/27/16
to mongodb-user
Hi,
My Apache Spark version is "spark-2.0.0-bin-hadoop2.7" and Scala version is 2.10.6

Thank you

Weng Shao

Ross Lawley

unread,
Sep 27, 2016, 6:02:24 AM9/27/16
to mongodb-user
Hi Weng,

Thats the reason - org.mongodb.spark:mongo-spark-connector_2.10:1.0.0  supports Spark 1.6.x

You will need to use the release candidate for Spark 2.0:

 ./bin/spark-submit --master "local[4]" --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection" --packages org.mongodb.spark:mongo-spark-connector_2.10:2.0.0-rc0 introduction.py

And that will work.

Ross

Weng Shao Fong

unread,
Sep 27, 2016, 7:09:08 AM9/27/16
to mongodb-user
Hi Ross,

Thank you for ur help and
I think I encountered another error.

when executed the introduction.py, the .show() shown an error like this


Traceback (most recent call last):
  File "/home/wengshao/intro.py", line 55, in <module>
    centenarians.show()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 287, in show
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/local/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o62.showString.
: java.lang.UnsupportedOperationException: The DefaultMongoPartitioner requires MongoDB >= 3.2
        at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:58)
        at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:137)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:326)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
        at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2189)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1925)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1924)
        at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2562)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:1924)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2139)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:211)
        at java.lang.Thread.run(Thread.java:745)

Can you please help me?
Thank you

Weng Shao

Ross Lawley

unread,
Sep 27, 2016, 8:35:49 AM9/27/16
to mongodb-user
Hi Weng,

The exception is: "The DefaultMongoPartitioner requires MongoDB >= 3.2"  This is because the default partitioner uses the $sample aggregation pipeline operator that comes with MongoDB 3.2.  To read from a collection the Mongo Spark Connector partitions the collection, so that Spark can take advantage of parallelism.  There are a number of partitioners to choose from as its just a trial run you can use the MongoPaginateBySizePartitioner:


 ./bin/spark-submit --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.coll?readPreference=primaryPreferred" \
                   
--conf "spark.mongodb.input.partitioner=MongoPaginateBySizePartitioner" \
                   
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.coll" \
                   
--packages org.mongodb.spark:mongo-spark-connector_2.10:2.0.0-rc0 \
                    introduction
.py


Each partitioner has pros and cons, so its important to pick the best one available for your use case.

Also please note that the SparkConf is only one way configure the MongoDB Spark connector - you can also pass the values as options to  DataFrameReader/Writer on a per operation basis.

Hope that helps,

Ross

Weng Shao Fong

unread,
Sep 27, 2016, 8:42:57 AM9/27/16
to mongodb-user
Hi Ross,

It work!
Thank you for your help.
Really appreciate it.

Weng Shao
Reply all
Reply to author
Forward
0 new messages