Regarding Spark Cassandra Metrics - Spark 2.3.0 Scala 2.11 Connector com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

76 views
Skip to first unread message

Yogesh Kumar Garg

unread,
Feb 1, 2022, 1:19:33 AM2/1/22
to DataStax Spark Connector for Apache Cassandra
I am developing a spark application where I am loading the data into Cassandra and I am using the Spark Cassandra connector for the same. I have created a FAT jar with all the dependencies and submitted that using spark-submit in deploy-mode client and master as yarn. 

I am able to load the data successfully to cassandra, but I am not able to get the metrics from the spark cassandra connector. I checked the executor logs and saw that the following properties failed to initialize because of the mentioned error.

Properties:
"spark.metrics.conf.driver.source.cassandra-connector.class": "org.apache.spark.metrics.CassandraConnectorSource"
"spark.metrics.conf.executor.source.cassandra-connector.class": "org.apache.spark.metrics.CassandraConnectorSource"
Error:

22/01/28 15:30:55 ERROR MetricsSystem: Source class org.apache.spark.metrics.CassandraConnectorSource cannot be instantiated
java.lang.ClassNotFoundException: org.apache.spark.metrics.CassandraConnectorSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSources$1.apply(MetricsSystem.scala:182)
at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSources$1.apply(MetricsSystem.scala:179)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at org.apache.spark.metrics.MetricsSystem.registerSources(MetricsSystem.scala:179)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:101)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:364)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:200)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:228)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)

I am loading the data to cassandra using the below code:

cassandraTableDataset.toDF(cassandraTable.getRenamedColumns()).
write().format(sparkCassandraFormat).
options(ImmutableMap.of(cassandraKeyspaceString, getKeyspace(config),
cassandraTableString, getTable(config))).
mode(SaveMode.Append).
save();

I cannot copy the spark cassandra connector jar to all the nodes in the cluster because of some restrictions.

Solutions tried:

Solution 1: 

Used spark.jars and spark.executor.extraClassPath options, but it did not work. As the executor's spark session is getting created before these jars or FAT application jar is fetched/copied to the executor node.

Solution 2:

I tried to manually initialize the  org.apache.spark.metrics.CassandraConnectorSource class and registered with SparkEnv Metric system just before the cassandra loading, and again it did not work. I am assuming these changes are happening on the driver only.

Solution 3:

I also tried to set the same Properties using  sparkEnvironment.getSparkSession().conf().set(), and it did not work as well. I do not know if the above mentioned properties can be added at runtime or not, so I tried this as well. I was hoping that this would help me change the executor config at runtime.


Spark Version: 2.3.0
Scala Version: 2.11
Spark Cassandra Connector: com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

Please help with this issue, as these metrics are important. Thanks in advance.
Reply all
Reply to author
Forward
0 new messages