“state should be: open” when using mapPartitions with Spark connector

158 views
Skip to first unread message

CalmAmity

unread,
Apr 25, 2019, 5:52:22 PM4/25/19
to mongodb-user
Hello!

I posted the question below (with slightly better formatting) to StackOverflow (https://stackoverflow.com/questions/55849544) but I thought I'd try my luck here as well.

Thanks in advance for any answers!

=========

The setup
=========
I have a simple Spark application that uses `mapPartitions` to transform an RDD. As part of this transformation, I retrieve some necessary data from a Mongo database. The connection from the Spark worker to the Mongo database is managed using the MongoDB Connector for Spark (https://docs.mongodb.com/spark-connector/current/).

I'm using `mapPartitions` instead of the simpler `map` because there is some relatively expensive setup that is only required once for all elements in a partition. If I were to use `map` instead, this setup would have to be repeated for every element individually.

The problem
===========
When one of the partitions in the source RDD becomes large enough, the transformation fails with the message

IllegalStateException: state should be: open
or, occasionally, with

IllegalStateException: The pool is closed

The code
========
Below is the code of a simple Scala application with which I can reproduce the issue:
```
package my.package

import com.mongodb.spark.MongoConnector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.bson.Document

object MySparkApplication {
def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.appName("MySparkApplication")
.master(???) // The Spark master URL
.config("spark.jars", ???) // The path at which the application's fat JAR is located.
.config("spark.scheduler.mode", "FAIR")
.config("spark.mongodb.keep_alive_ms", "86400000")
.getOrCreate()

val mongoConnector: MongoConnector = MongoConnector(Map(
"uri" -> ??? // The MongoDB URI.
, "spark.mongodb.keep_alive_ms" -> "86400000"
, "keep_alive_ms" -> "86400000"
))

val localDocumentIds: Seq[Long] = Seq.range(1L, 100L)
val documentIdsRdd: RDD[Long] = sparkSession.sparkContext.parallelize(localDocumentIds)

val result: RDD[Document] = documentIdsRdd.mapPartitions { documentIdsIterator =>
mongoConnector.withMongoClientDo { mongoClient =>
val collection = mongoClient.getDatabase("databaseName").getCollection("collectionName")
// Some expensive query that should only be performed once for every partition.
collection.find(new Document("_id", 99999L)).first()

documentIdsIterator.map { documentId =>
// An expensive operation that does not interact with the Mongo database.
Thread.sleep(1000)
collection.find(new Document("_id", documentId)).first()
}
}
}

val resultLocal = result.collect()
}
}
```

The stack trace
===============
Below is the stack trace returned by Spark when I run the application above:
```
Driver stacktrace:
[...]
at my.package.MySparkApplication.main(MySparkApplication.scala:41)
at my.package.MySparkApplication.main(MySparkApplication.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.IllegalStateException: state should be: open
at com.mongodb.assertions.Assertions.isTrue(Assertions.java:70)
at com.mongodb.connection.BaseCluster.getDescription(BaseCluster.java:152)
at com.mongodb.Mongo.getConnectedClusterDescription(Mongo.java:885)
at com.mongodb.Mongo.createClientSession(Mongo.java:877)
at com.mongodb.Mongo$3.getClientSession(Mongo.java:866)
at com.mongodb.Mongo$3.execute(Mongo.java:823)
at com.mongodb.FindIterableImpl.first(FindIterableImpl.java:193)
at my.package.MySparkApplication$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(MySparkApplication.scala:36)
at my.package.MySparkApplication$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(MySparkApplication.scala:33)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```

The research I have done
========================
I have found several people asking about this issue, and it seems that in all of their cases, the problem turned out to be them using the Mongo client after it had been closed. As far as I can tell, this is not happening in my application - opening and closing the connection should be handled by the Mongo-Spark connector, and I would expect the client to only be closed after the function passed to `mongoConnector.withMongoClientDo` returns.

I did manage to discover that the issue does not arise for the very first element in the RDD. It seems instead that a number of elements are being processed successfully, and that the failure only occurs once the process has taken a certain amount of time. This amount of time seems to be on the order of 5 to 15 seconds.

The above leads me to believe that something is automatically closing the client once it has been active for a certain amount of time, *even though it is still being used*.

As you can tell by my code, I have discovered the fact that the Mongo-Spark connector exposes a configuration `spark.mongodb.keep_alive_ms` that, according to the connector documentation, controls "The length of time to keep a MongoClient available for sharing". Its default value is 5 seconds, so this seemed like a useful thing to try. In the application above, I attempt to set it to an entire day in three different ways, with zero effect. The documentation does state that this specific property "can only be configured via a System Property". I think that this is what I'm doing (by setting the property when initialising the Spark session and/or Mongo connector), but I'm not entirely sure. It seems to be impossible to verify the setting once the Mongo connector has been initialised.

One other StackOverflow question mentions that I should try setting the `maxConnectionIdleTime` option in the `MongoClientOptions`, but as far as I can tell it is not possible to set these options through the connector.

As a sanity check, I tried replacing the use of `mapPartitions` with a functionally equivalent use of `map`. The issue disappeared, which is probably because the connection to the Mongo database is re-initialised for each individual element of the RDD. However, as mentioned above, this approach would have significantly worse performance because I would end up repeating expensive setup work for every element in the RDD.

Out of curiosity I also tried replacing the call to `mapPartitions` with a call to `foreachPartition`, also replacing the call to `documentIdsIterator.map` with `documentIdsIterator.foreach`. The issue also disappeared in this case. I have no idea why this would be, but because I need to transform my RDD, this is also not an acceptable approach.

The kind of answer I am looking for
===================================
- "You actually *are* closing the client prematurely, and here's where: [...]"
- "This is a known issue in the Mongo-Spark connector, and here's a link to their issue tracker: [...]"
- "You are setting the `spark.mongodb.keep_alive_ms` property incorrectly, this is how you should do it: [...]"
- "It is possible to verify the value of `spark.mongodb.keep_alive_ms` on your Mongo connector, and here's how: [...]"
- "It is possible to set `MongoClientOptions` such as `maxConnectionIdleTime` through the Mongo connector, and here's how: [...]"

Robert Cochran

unread,
Apr 25, 2019, 8:26:46 PM4/25/19
to mongodb-user
Hi,

This is far and away the best problem statement I have seen on this forum in the year or so that I've been on it. I am sorry that I cannot offer useful help since I don't have any Spark experience. Your written statement is superb.

Thanks so much

Bob

Ross Lawley

unread,
Apr 26, 2019, 5:19:21 AM4/26/19
to mongod...@googlegroups.com
Hi CalmAmity,

Thanks for the great post describing the issue.  I've created https://jira.mongodb.org/browse/SPARK-243 to track it and will try to review in the near future.

As MongoClients aren't serializable they have to be created on each worker node.  To ensure that the worker tasks are efficient and don't each have to set up a MongoClient there is a MongoClientCache they can all use.  It looks like there could be an issue with the timeouts or possibly its just a difference between how Spark handles and processes: `documentIdsIterator.map` compared to `documentIdsIterator.foreach`. Hopefully, debugging further will provide some answers.

Ross

--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/713457e3-609a-47b9-976f-7f25f2b66a90%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--


{ name     : "Ross Lawley",
  title    : "Staff Engineer",
 
location : "Remote, EMEA (UK)",
  twitter  : ["@RossC0", "@MongoDB"],
  facebook :"MongoDB"}

CalmAmity

unread,
May 6, 2019, 10:46:10 AM5/6/19
to mongodb-user
Hi Ross,

Thanks for the information, and for creating the JIRA ticket. I did some more digging, and discovered the following (also posted to StackOverflow and the JIRA ticket):

The phrase 'System property' used in the connector's documentation refers to a Java system property, set using `System.setProperty("spark.mongodb.keep_alive_ms", desiredValue)` or the command line option `-Dspark.mongodb.keep_alive_ms=desiredValue`. This value is then read by the `MongoConnector` singleton object, and passed to the `MongoClientCache`. However, neither of the approaches for setting this property actually works:

- Calling `System.setProperty()` from the driver program sets the value only in the JVM for the Spark driver program, while the value is needed in the Spark worker's JVM.
- Calling `System.setProperty()` from the worker program sets the value only after it is read by `MongoConnector`.
- Passing the command line option `-Dspark.mongodb.keep_alive_ms` to the Spark option `spark.driver.extraJavaOptions` again only sets the value in the driver's JVM.
- Passing the command line option to the Spark option `spark.executor.extraJavaOptions` results in an error message from Spark:

```
Exception in thread "main" java.lang.Exception: spark.executor.extraJavaOptions is not allowed to set Spark options (was '-Dspark.mongodb.keep_alive_ms=desiredValue'). Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit.
```

The Spark code that throws this error is located in `org.apache.spark.SparkConf#validateSettings`, where it checks for any worker option value that contains the string `-Dspark`.

This seems like an oversight in the design of the Mongo connector; either the property should be set through the Spark session (as I originally expected it to be), or it should be renamed to something that doesn't start with `spark`.

If I am misunderstanding any of this, I would love some guidance on how to properly set the property.

Op vrijdag 26 april 2019 11:19:21 UTC+2 schreef Ross Lawley:
To unsubscribe from this group and stop receiving emails from it, send an email to mongod...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages