I am trying to save a RDD to Cassandra but I am running into the following error:
[{'key': 3, 'value': 'foobar'}]
[Stage 9:> (0 + 2) / 2]
[Stage 9:=============================> (1 + 1) / 2]WARN 2016-05-02 17:23:55,240 org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 9.0 (TID 11, 10.0.6.200): java.lang.NullPointerException
at com.datastax.bdp.spark.python.RDDPythonFunctions.com$datastax$bdp$spark$python$RDDPythonFunctions$$toCassandraRow(RDDPythonFunctions.scala:57)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:155)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:139)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ERROR 2016-05-02 17:23:55,406 org.apache.spark.scheduler.TaskSetManager: Task 1 in stage 9.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/home/ubuntu/test-spark.py", line 50, in <module>
main()
File "/home/ubuntu/test-spark.py", line 47, in main
runner.run()
File "/home/ubuntu/spark_common.py", line 62, in run
self.save_logs_to_cassandra()
File "/home/ubuntu/spark_common.py", line 142, in save_logs_to_cassandra
rdd.saveToCassandra(keyspace, tablename)
File "/usr/share/dse/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2313, in saveToCassandra
File "/usr/share/dse/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/share/dse/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o149.saveToCassandra.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 4 times, most recent failure: Lost task 1.3 in stage 9.0 (TID 14, 10.0.6.200): java.lang.NullPointerException
at com.datastax.bdp.spark.python.RDDPythonFunctions.com$datastax$bdp$spark$python$RDDPythonFunctions$$toCassandraRow(RDDPythonFunctions.scala:57)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at com.datastax.bdp.spark.python.RDDPythonFunctions$$anonfun$toCassandraRows$1.apply(RDDPythonFunctions.scala:73)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at com.datastax.spark.connector.util.CountingIterator.next(CountingIterator.scala:16)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:155)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:109)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:139)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:139)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1276)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1266)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1460)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1421)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
The Python code looks like this:
rdd = self.context.parallelize([{'key': 3, 'value': 'foobar'}])
print rdd.collect()
rdd.saveToCassandra("test", "dummy")
I am using DSE 4.8.6 which runs Spark 1.4.2
I ran through a bunch of existing posts on this mailing lists and have already performed the following routines:
* Ensure that there is no redundant cassandra .jar lying around, interfering with the process.
* Wiped clean and reinstall DSE to ensure that.
* Tried Loading data from Cassandra to ensure that Spark <-> Cassandra communication is working. I used print self.context.cassandraTable(keyspace='test', table='dummy').collect() to validate that.
* Ensure there are no null values in my dataset that is being written.
* The namespace and the table exist in Cassandra using cassandra@cqlsh> SELECT * from test.dummy ;
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.