We are facing issue leading to our disk is getting full, shuffle files generated during the job runs are filling disk space. The more the data, more disk is getting consumed, more than 80 GB disk is getting consumed. We can't live with such high disk usage as disk space is limited. We are using external shuffle service, and split size and row reads from Cassandra is changed to 2000, In order to support multiple scenarios is parallel.
Please advice, how to reduce number of write files and save disk space. We are using spark 1.2.0.
Full error stack trace is:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 70919 in stage 0.0 failed 4 times, most recent failure: Lost task 70919.3 in stage 0.0 (TID 71106, ######): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at org.apache.spark.shuffle.IndexShuffleBlockManager$$anonfun$writeIndexFile$1.apply$mcVJ$sp(IndexShuffleBlockManager.scala:93)
at org.apache.spark.shuffle.IndexShuffleBlockManager$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockManager.scala:91)
at org.apache.spark.shuffle.IndexShuffleBlockManager$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockManager.scala:91)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:168)
at org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:91)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Thanks,
Ashish
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
Thanks for you answer.
Actually we are using one operation countByValue, to group different record sets, which in turn leads to increase number of shuffle files.
We have also seen the co-relation with number of partitions, as we are increasing spark.cassandra.input.split.size paritions from 2000 to 5000 and further we see decrease in disk usage.
But due to limitation, where we are reading thrift data (written from native protocol v1 and "PagingState instances are not portable across native protocol versions") and causes issues when records are paged, refer below link:
https://datastax.github.io/java-driver/2.0.12/features/paging/
To overcome that issue we have kept both the below properties same:
spark.cassandra.input.page.row.size=2000
spark.cassandra.input.split.size=2000
and
spark.cassandra.input.consistency.level = LOCAL_QUORUM
Which has fixed our issue of count mismatch, but disk usage is becoming considerable high, Please advise.
Adding more disk space does not seems to be a solution in short term to fix this issue, as per node 80GB disk is getting utilized (we are running spark on multi node cluster)
Thanks,
Ashish
--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
I hope we have a solution of all the problems. Row count Issue with CQL seems to be fixed in Cassandra 2.1.14 & Cassandra 2.2.6.
Please details here: https://issues.apache.org/jira/browse/CASSANDRA-11467 Paging loses rows in certain conditions
And I believe now we can live with default values of the below properties, which in turn creating lots of shuffle files.
spark.cassandra.input.page.row.size=
spark.cassandra.input.split.size=
Thanks,
Ashish