java.lang.NullPointerException while performing rdd.SaveToCassandra

256 views
Skip to first unread message

Piyush Verma

unread,
May 2, 2016, 1:37:20 PM5/2/16
to DataStax Spark Connector for Apache Cassandra
Hi,

I am trying to save a RDD to Cassandra but I am running into the following error:

[{'key': 3, 'value': 'foobar'}]

[Stage 9:> (0 + 2) / 2]
[Stage 9:=============================> (1 + 1) / 2]WARN 2016-05-02 17:23:55,240 org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 9.0 (TID 11, 10.0.6.200): java.lang.NullPointerException
at com.datastax.bdp.spark.python.RDDPythonFunctions.com$datastax$bdp$spark$python$RDDPythonFunctions$$toCassandraRow(RDDPythonFunctions.scala:57)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:155)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:139)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

ERROR 2016-05-02 17:23:55,406 org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 9.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/home/ubuntu/test-spark.py", line 50, in <module>
main()
File "/home/ubuntu/test-spark.py", line 47, in main
runner.run()
File "/home/ubuntu/spark_common.py", line 62, in run
self.save_logs_to_cassandra()
File "/home/ubuntu/spark_common.py", line 142, in save_logs_to_cassandra
rdd.saveToCassandra(keyspace, tablename)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2313, in saveToCassandra
File "/usr/share/dse/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/share/dse/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o149.saveToCassandra.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 4 times, most recent failure: Lost task 1.3 in stage 9.0 (TID 14, 10.0.6.200): java.lang.NullPointerException
at com.datastax.bdp.spark.python.RDDPythonFunctions.com$datastax$bdp$spark$python$RDDPythonFunctions$$toCassandraRow(RDDPythonFunctions.scala:57)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:155)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:139)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1276)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1266)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1460)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1421)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

The Python code looks like this:
rdd = self.context.parallelize([{'key': 3, 'value': 'foobar'}])
print rdd.collect()
rdd.saveToCassandra("test", "dummy")

I am using DSE 4.8.6 which runs Spark 1.4.2

I ran through a bunch of existing posts on this mailing lists and have already performed the following routines:

* Ensure that there is no redundant cassandra .jar lying around, interfering with the process.
* Wiped clean and reinstall DSE to ensure that.
* Tried Loading data from Cassandra to ensure that Spark <-> Cassandra communication is working. I used print self.context.cassandraTable(keyspace='test', table='dummy').collect() to validate that.
* Ensure there are no null values in my dataset that is being written.
* The namespace and the table exist in Cassandra using cassandra@cqlsh> SELECT * from test.dummy ;

Russell Spitzer

unread,
May 2, 2016, 1:51:56 PM5/2/16
to DataStax Spark Connector for Apache Cassandra
You may want to ping the author of PysparkCassandra since that functionality isn't part of the Connector by default. But from what I can see you are trying to write a Dictionary into a C* Row and I'm not sure if the PysparkCassandra module will automatically transform that into a Row object. So the failure may be because the RDD created {RDD[Map[String,String]] cannot be written to C*.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

Piyush Verma

unread,
May 2, 2016, 2:17:02 PM5/2/16
to spark-conn...@lists.datastax.com
Yeah I had guessed that too. So I tried transforming it into a Row before saving to Cassandra but that didn’t matter. I checked the source code, it does the translation to a Row on my behalf If I don’t. Also, this is pretty much an example straight off their documentation and it seems to be working with 1.2.x

I will get in touch with PysparkCassandra’s author. Thanks for the word.
> Russell Spitzer
> Software Engineer
>
>
>
>
>
>
> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/FAQ.md
> http://spark-packages.org/package/datastax/spark-cassandra-connector
>
> --
> You received this message because you are subscribed to a topic in the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
> To unsubscribe from this topic, visit https://groups.google.com/a/lists.datastax.com/d/topic/spark-connector-user/ED1zgGIzSPg/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to spark-connector-...@lists.datastax.com.

signature.asc

Piyush Verma

unread,
May 2, 2016, 2:35:28 PM5/2/16
to DataStax Spark Connector for Apache Cassandra
Hey, do you know the URL of the project?
The pyspark.zip that ships along with DSE has no mention of the project URL.

Russell Spitzer

unread,
May 2, 2016, 3:18:37 PM5/2/16
to spark-conn...@lists.datastax.com
Dse doesn't ship with pyspark cassandra, aren't you getting it from https://github.com/TargetHolding/pyspark-cassandra ? Dse only supports Dataframes from pyspark.

Piyush Verma

unread,
May 2, 2016, 3:24:12 PM5/2/16
to spark-conn...@lists.datastax.com
I don’t think I have added any driver. Besides the misbehaving code is initiated from /usr/share/dse/spark/python/lib/pyspark.zip which DSE seems to have shipped.
> You received this message because you are subscribed to a topic in the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
> To unsubscribe from this topic, visit https://groups.google.com/a/lists.datastax.com/d/topic/spark-connector-user/ED1zgGIzSPg/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to spark-connector-...@lists.datastax.com.

signature.asc

Russell Spitzer

unread,
May 2, 2016, 3:35:53 PM5/2/16
to spark-conn...@lists.datastax.com
That functionality is official deprecated in 4.8. I thought we had completely removed it. I would use the TargetHolding code or Dataframes.
Reply all
Reply to author
Forward
0 new messages