Exception in thread "DAGScheduler" java.lang.IllegalArgumentException: Shuffle ID 71056 registered twice

428 views
Skip to first unread message

seanm

unread,
Apr 2, 2013, 1:54:01 AM4/2/13
to spark...@googlegroups.com
I'm running a spark streaming job and I seem to occasionally run into this after running for a few hours.  I don't spot any exceptions on the UI.  Does anyone know what I can do to troubleshoot further?


13/04/02 03:32:47 ERROR cluster.TaskSetManager: Task 90304.0:2 failed more than 4 times; aborting job
13/04/02 03:32:47 ERROR streaming.JobManager: Running streaming job 7852 @ 1364873520000 ms failed
spark.SparkException: Job failed: Task 90304.0:2 failed more than 4 times
        at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
        at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
        at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:303)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
        at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
13/04/02 03:33:45 ERROR cluster.TaskSetManager: Task 90321.0:3 failed more than 4 times; aborting job
13/04/02 03:33:45 ERROR streaming.JobManager: Running streaming job 7853 @ 1364873525000 ms failed
spark.SparkException: Job failed: Task 90321.0:3 failed more than 4 times
        at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:642)
        at spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:640)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:640)
        at spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:303)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:364)
        at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)
13/04/02 03:33:45 ERROR cluster.TaskSetManager: Task 90342.0:2 failed more than 4 times; aborting job
13/04/02 03:33:45 ERROR streaming.JobManager: Running streaming job 7854 @ 1364873530000 ms failed
spark.SparkException: Job failed: Task 90342.0:2 failed more than 4 times

...

Exception in thread "DAGScheduler" java.lang.IllegalArgumentException: Shuffle ID 71056 registered twice
        at spark.MapOutputTracker.registerShuffle(MapOutputTracker.scala:81)
        at spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:151)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:135)
        at spark.scheduler.DAGScheduler$$anonfun$visit$2$1.apply(DAGScheduler.scala:196)
        at spark.scheduler.DAGScheduler$$anonfun$visit$2$1.apply(DAGScheduler.scala:193)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
        at scala.collection.immutable.List.foreach(List.scala:76)
        at spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:193)
        at spark.scheduler.DAGScheduler$$anonfun$visit$2$1.apply(DAGScheduler.scala:201)
        at spark.scheduler.DAGScheduler$$anonfun$visit$2$1.apply(DAGScheduler.scala:193)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
        at scala.collection.immutable.List.foreach(List.scala:76)
        at spark.scheduler.DAGScheduler.visit$2(DAGScheduler.scala:193)
        at spark.scheduler.DAGScheduler$$anonfun$visit$2$1.apply(DAGScheduler.scala:201)
        at spark.scheduler.DAGScheduler$$anonfun$visit$2$1.apply(DAGScheduler.scala:193)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
...

t spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$getMissingParentStages(DAGScheduler.scala:207)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:420)
        at spark.scheduler.DAGScheduler$$anonfun$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:428)
        at spark.scheduler.DAGScheduler$$anonfun$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:427)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
        at scala.collection.immutable.List.foreach(List.scala:76)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:427)
        at spark.scheduler.DAGScheduler$$anonfun$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:428)
        at spark.scheduler.DAGScheduler$$anonfun$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:427)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
        at scala.collection.immutable.List.foreach(List.scala:76)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:427)
        at spark.scheduler.DAGScheduler$$anonfun$submitWaitingStages$6.apply(DAGScheduler.scala:344)
        at spark.scheduler.DAGScheduler$$anonfun$submitWaitingStages$6.apply(DAGScheduler.scala:343)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
        at spark.scheduler.DAGScheduler.submitWaitingStages(DAGScheduler.scala:343)
        at spark.scheduler.DAGScheduler.spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:378)
        at spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:107)


vivace

unread,
Dec 30, 2013, 8:23:29 PM12/30/13
to spark...@googlegroups.com
I am also encountering this. Any ideas?

vivace

unread,
Jan 2, 2014, 1:28:28 PM1/2/14
to spark...@googlegroups.com
I seem to have gotten over this issue by making sure all of my state streams were checkpointed. 


On Monday, April 1, 2013 10:54:01 PM UTC-7, seanm wrote:

Tathagata Das

unread,
Jan 2, 2014, 2:33:06 PM1/2/14
to spark...@googlegroups.com
Spark periodically clears RDD and shuffle metadata that are older than the TTL. If any active RDDs (i.e., RDDs being used in current jobs) still refer to the metadata that are older than the TTL, then the metadata deletion can give rise to inconsistencies and race conditions, which may lead to this error. In case of Spark Streaming, checkpointing prevents the lineage of RDDs from growing too long, so that new RDDs don't depend on very old RDDs and old RDDs can be cleared. Hence, disabling checkpointing in state streams gave this error. 

TD


--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

matthew Drescher

unread,
Jan 2, 2014, 2:35:46 PM1/2/14
to spark...@googlegroups.com
what is a good duration to set the checkpoints to?


--
You received this message because you are subscribed to a topic in the Google Groups "Spark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/spark-users/OhJBXJPZ3j8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to spark-users...@googlegroups.com.

Tathagata Das

unread,
Jan 2, 2014, 5:14:52 PM1/2/14
to spark...@googlegroups.com
Checkpointing the RDDs has some cost. You will notice that the batches where RDDs is checkpointed will take a slightly longer to run. Hence, checkpoint too frequently will increase the average time to process each batch (i.e., higher average latency). On the other hand, checkpointing too infrequently would also lead to a slow increase in latency as RDD lineages keep growing. So a "good" checkpointing interval is very subjective to the batch interval, etc and you have to do a bit of experimenting to find an acceptable value. My recommendation would be to start with something small (typically no less than 10 seconds) and increase the interval to see if latency improves.

I forgot to mention in my previous post. The spark.cleaner.ttl must be set to a higher value than the checkpoint interval of any dstream. Otherwise above errors may happen. 

TD

matthew Drescher

unread,
Jan 2, 2014, 7:50:29 PM1/2/14
to spark...@googlegroups.com
Ok i am now experiencing this thing where it just kind of stops after a while.
when i make the checkpoint interval smaller and the spark.cleaner.ttl higher it seems to go for a lot longer but still seems to stop at a later time in proportion to the change.

 any ideas?

Tathagata Das

unread,
Jan 3, 2014, 2:03:04 PM1/3/14
to spark...@googlegroups.com
Does it stop with an error? Also after how long? Can you give an idea of the different values of checkpoint interval and ttl and how long it takes to stop?
Also, please enable INFO level logs to see whats going on. 
Reply all
Reply to author
Forward
0 new messages