Severe java.io.IOException: No space left on device, Shuffle files are eating disk space

2,006 views
Skip to first unread message

ashu.shri

unread,
Apr 4, 2016, 5:04:20 AM4/4/16
to DataStax Spark Connector for Apache Cassandra
Hi Experts,

We are facing issue leading to our disk is getting full, shuffle files generated during the job runs are filling disk space. The more the data, more disk is getting consumed, more than 80 GB disk is getting consumed. We can't live with such high disk usage as disk space is limited. We are using external shuffle service, and split size and row reads from Cassandra is changed to 2000, In order to support multiple scenarios is parallel.

Please advice, how to reduce number of write files and save disk space. We are using spark 1.2.0.

Full error stack trace is:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 70919 in stage 0.0 failed 4 times, most recent failure: Lost task 70919.3 in stage 0.0 (TID 71106, ######): java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at org.apache.spark.shuffle.IndexShuffleBlockManager$$anonfun$writeIndexFile$1.apply$mcVJ$sp(IndexShuffleBlockManager.scala:93)
at org.apache.spark.shuffle.IndexShuffleBlockManager$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockManager.scala:91)
at org.apache.spark.shuffle.IndexShuffleBlockManager$$anonfun$writeIndexFile$1.apply(IndexShuffleBlockManager.scala:91)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:168)
at org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:91)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,
Ashish

Russell Spitzer

unread,
Apr 4, 2016, 12:38:19 PM4/4/16
to DataStax Spark Connector for Apache Cassandra
Every shuffle that spills will require disk space, so I would see if you can avoid so much shuffling. Perhaps you can cache results after joining or repartitioning? This way you only have a single set of shuffle files rather than many? Outside of that you most likely just need more space. 

I would preferably just add more machines with more disk space.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

ashu.shri

unread,
Apr 5, 2016, 5:00:27 AM4/5/16
to DataStax Spark Connector for Apache Cassandra
Hi Russell,

Thanks for you answer.

Actually we are using one operation countByValue, to group different record sets, which in turn leads to increase number of shuffle files.
We have also seen the co-relation with number of partitions, as we are increasing spark.cassandra.input.split.size paritions from 2000 to 5000 and further we see decrease in disk usage.

But due to limitation, where we are reading thrift data (written from native protocol v1 and "PagingState instances are not portable across native protocol versions") and causes issues when records are paged, refer below link:
https://datastax.github.io/java-driver/2.0.12/features/paging/

To overcome that issue we have kept both the below properties same:
spark.cassandra.input.page.row.size=2000
spark.cassandra.input.split.size=2000
and
spark.cassandra.input.consistency.level = LOCAL_QUORUM

Which has fixed our issue of count mismatch, but disk usage is becoming considerable high, Please advise.

Adding more disk space does not seems to be a solution in short term to fix this issue, as per node 80GB disk is getting utilized (we are running spark on multi node cluster)

Thanks,
Ashish

Russell Spitzer

unread,
Apr 5, 2016, 9:48:03 AM4/5/16
to DataStax Spark Connector for Apache Cassandra
Any cogroups without a common partitioner will require a shuffle *including* cogroups with itself. If you can minimize those it will really help out. I'm not sure why input split-size has anything to do with your thrift issue. The input.split.size property controls the size of the Spark Partition, it does not change the behavior of the driver. The page.row.size should affect the driver paging. 

The underlying problem is still that your partitions are spilling onto disk during your shuffles (could be the countByValue) and there is really no way around this. You need to spill the files if they don't fit in memory.

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

ashu.shri

unread,
Apr 8, 2016, 4:48:57 AM4/8/16
to DataStax Spark Connector for Apache Cassandra
Hello,

I hope we have a solution of all the problems. Row count Issue with CQL seems to be fixed in Cassandra 2.1.14 & Cassandra 2.2.6.

Please details here: https://issues.apache.org/jira/browse/CASSANDRA-11467 Paging loses rows in certain conditions

And I believe now we can live with default values of the below properties, which in turn creating lots of shuffle files.

spark.cassandra.input.page.row.size=
spark.cassandra.input.split.size=

Thanks,
Ashish

Reply all
Reply to author
Forward
0 new messages