SparkStreaming with Tachyon - I still get BlockNotFoundException !

92 views
Skip to first unread message

Dibyendu Bhattacharya

unread,
May 8, 2015, 2:06:47 AM5/8/15
to tachyo...@googlegroups.com
Dear All , 

I have been playing with Spark Streaming on Tachyon as the OFF_HEAP block store  . Primary reason for evaluating Tachyon is to find if Tachyon can solve the Spark BlockNotFoundException .

In traditional Spark based MEMORY_ONLY StorageLevel, when blocks are evicted  , jobs failed due to block not found exception and storing blocks in MEMORY_AND_DISK is not a good option either as it impact the throughput a lot .

To test how Tachyon behave , I took the latest spark 1.4 from master , and used Tachyon 0.6.4 and configured Tachyon in Fault Tolerant Mode . Tachyon is running in 3 Node AWS x-large cluster and Spark is running in 3 node AWS x-large cluster.

I have used the low level Receiver based Kafka consumer (http://spark-packages.org/package/dibbhatt/kafka-spark-consumer)  which I have written to pull from Kafka and write Blocks to Tachyon 

I found there is similar throughput (as MEMORY_ONLY case ) and  very good overall memory utilization (as it is off heap store) . 


But I found one issue on which I need to clarification . 

In Tachyon case also , I find  BlockNotFoundException  .  What I see TachyonBlockManager.scala put the blocks in WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted from Tachyon Cache and when Spark try to find the block it throws  BlockNotFoundException . 

I see a pull request which discuss the same ..


When I modified the WriteType to CACHE_THROUGH , BlockDropException is gone , but it again impact the throughput ..

Just curious to know , if Tachyon has any settings which can solve the Block Eviction from Cache to Disk, other than explicitly setting CACHE_THROUGH  ?

Regards, 
Dibyendu

Dibyendu Bhattacharya

unread,
May 8, 2015, 11:38:13 AM5/8/15
to tachyo...@googlegroups.com
Just a followup on this Thread . 

I tried Hierarchical Storage on Tachyon ( http://tachyon-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have worked and I did not see any any Spark Job failed due to BlockNotFoundException. below is my  Hierarchical Storage settings..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
  -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

Regards, 
Dibyendu

Bin Fan

unread,
May 8, 2015, 1:31:44 PM5/8/15
to Dibyendu Bhattacharya, tachyo...@googlegroups.com
Hi Dibyendu, 

Thanks for sharing the solution. It looks to me that essentially, you are increasing the capacity of Tachyon storage by including a 2nd tier storage using harddisk.
In this way, the Block eviction is eliminated. Very interesting. 

- Bin

--
You received this message because you are subscribed to the Google Groups "Tachyon Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to tachyon-user...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Calvin Jia

unread,
May 8, 2015, 2:02:06 PM5/8/15
to tachyo...@googlegroups.com, fanb...@gmail.com, dibyendu.b...@gmail.com
Hi,

I think Bin is right, by increasing the total Tachyon managed space, another 50GB buffer was added. Was the total size of your data set less than (MEM + HDD) per worker?

Thanks,
Calvin

Dibyendu Bhattacharya

unread,
May 8, 2015, 2:20:42 PM5/8/15
to Calvin Jia, fanb...@gmail.com, tachyo...@googlegroups.com

For this test Yes. My total size of data is less than what allocated for HDD + Memory . It also dependends on how fast the Spark Streaming processing the blocks . With such high value of Tachyon buffer ; it wont happen that my processing logic is lagging so much behind from incoming data rate . So it worked for me .

Dibyendu

Calvin Jia

unread,
May 8, 2015, 2:31:32 PM5/8/15
to tachyo...@googlegroups.com, dibyendu.b...@gmail.com, jia.c...@gmail.com, fanb...@gmail.com
Got it, thanks for sharing.

Dibyendu Bhattacharya

unread,
May 11, 2015, 8:49:36 AM5/11/15
to tachyo...@googlegroups.com, dibyendu.b...@gmail.com, jia.c...@gmail.com

Just to follow up this thread further .

I was doing some fault tolerant testing of Spark Streaming with Tachyon as OFF_HEAP block store. As I said in earlier email, I could able to solve the BlockNotFound exception when I used Hierarchical Storage of Tachyon ,  which is good. 

I continue doing some testing around storing the Spark Streaming WAL and CheckPoint files also in Tachyon . Here is few finding ..


When I store the Spark Streaming Checkpoint location in Tachyon , the throughput is much higher . I tested the Driver and Receiver failure cases , and Spark Streaming is able to recover without any Data Loss on Driver failure.

But on Receiver failure , Spark Streaming looses data as I see Exception while reading the WAL file from Tachyon "receivedData" location  for the same Receiver id which just failed. 

If I change the Checkpoint location back to HDFS , Spark Streaming can recover from both Driver and Receiver failure .

Here is the Log details when Spark Streaming receiver failed ...I raised a JIRA for the same issue : https://issues.apache.org/jira/browse/SPARK-7525



INFO : org.apache.spark.scheduler.DAGScheduler - Executor lost: 2 (epoch 1)
INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to remove executor 2 from BlockManagerMaster.
INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing block manager BlockManagerId(2, 10.252.5.54, 45789)
INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2 successfully in removeExecutor
INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - Registered receiver for stream 2 from 10.252.5.62:47255
WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: 645603894, fileSize = 0
at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
... 15 more

INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.2 in stage 103.0 (TID 422, 10.252.5.61, ANY, 1909 bytes)
INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.2 in stage 103.0 (TID 422) on executor 10.252.5.61: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 1]
INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 2.3 in stage 103.0 (TID 423, 10.252.5.62, ANY, 1909 bytes)
INFO : org.apache.spark.deploy.client.AppClient$ClientActor - Executor updated: app-20150511104442-0048/2 is now LOST (worker lost)
INFO : org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Executor app-20150511104442-0048/2 removed: worker lost
ERROR: org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Asked to remove non-existent executor 2
INFO : org.apache.spark.scheduler.TaskSetManager - Lost task 2.3 in stage 103.0 (TID 423) on executor 10.252.5.62: org.apache.spark.SparkException (Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)) [duplicate 2]
ERROR: org.apache.spark.scheduler.TaskSetManager - Task 2 in stage 103.0 failed 4 times; aborting job
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 103.0, whose tasks have all completed, from pool 
INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Cancelling stage 103
INFO : org.apache.spark.scheduler.DAGScheduler - ResultStage 103 (foreachRDD at Consumer.java:92) failed in 0.943 s
INFO : org.apache.spark.scheduler.DAGScheduler - Job 120 failed: foreachRDD at Consumer.java:92, took 0.953482 s
ERROR: org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming job 1431341145000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 103.0 failed 4 times, most recent failure: Lost task 2.3 in stage 103.0 (TID 423, 10.252.5.62): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.IllegalArgumentException: Seek position is past EOF: 645603894, fileSize = 0
at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogRandomReader.read(FileBasedWriteAheadLogRandomReader.scala:37)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:104)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
... 15 more


On Saturday, May 9, 2015 at 12:01:32 AM UTC+5:30, Calvin Jia wrote:
Got it, thanks for sharing.
log.txt

Dibyendu Bhattacharya

unread,
May 12, 2015, 1:42:15 PM5/12/15
to tachyo...@googlegroups.com, dibyendu.b...@gmail.com
I guess this is something to do with the Tachyon Append support. I can see the Receiver throws below error while trying to store the block to block manager .


java.lang.IllegalStateException: File exists and there is no append support!
at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:33)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<init>(FileBasedWriteAheadLogWriter.scala:41)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:194)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:81)
at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:44)
at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler$$anonfun$5.apply(ReceivedBlockHandler.scala:178)
at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler$$anonfun$5.apply(ReceivedBlockHandler.scala:178)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

Calvin Jia

unread,
May 12, 2015, 3:22:04 PM5/12/15
to tachyo...@googlegroups.com, dibyendu.bh...@pearson.com, dibyendu.b...@gmail.com
Thanks for digging into the problem. I think you are right about the append support causing the problem. It seems like the receiver has already created the file (before it failed) and on recovery it cannot resume writing the file.

Dibyendu Bhattacharya

unread,
May 12, 2015, 11:50:29 PM5/12/15
to tachyo...@googlegroups.com, dibyendu.b...@gmail.com, dibyendu.bh...@pearson.com
Just curious to any tentative time frame by when  Tachyon append support can be available ..

Dibyendu

Haoyuan Li

unread,
May 17, 2015, 6:46:10 PM5/17/15
to Dibyendu Bhattacharya, tachyo...@googlegroups.com, Dibyendu Bhattacharya
Probably in three months. 

Best,

Haoyuan

--
You received this message because you are subscribed to the Google Groups "Tachyon Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to tachyon-user...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Haoyuan Li

Dan Dutrow

unread,
Oct 19, 2015, 2:37:47 PM10/19/15
to Tachyon Users, dibyendu.bh...@pearson.com, dibyendu.b...@gmail.com
Is this part of tachyon 0.7? We're running 0.6.X and can't use tachyon for checkpointing.

Jiří Šimša

unread,
Oct 20, 2015, 1:01:41 PM10/20/15
to Dan Dutrow, Tachyon Users, dibyendu.bh...@pearson.com, dibyendu.b...@gmail.com
Hi Dan,

As of Tachyon 0.7.1, there is no support for append and to the best of my knowledge it will not be supported in version 0.8.0 either.

Best,

--
Jiří Šimša
Reply all
Reply to author
Forward
0 new messages