Tuning Hadoop/Tachyon threads and timeouts

36 views
Skip to first unread message

achu....@gmail.com

unread,
Mar 10, 2015, 4:16:02 PM3/10/15
to tachyo...@googlegroups.com
I've been trying to get a teragen and terasort against Tachyon setup w/ SSDs only.  The basic idea of what I'm trying to do is to get Tachyon to cache more data closer to memory as I scale up (e.g. hypothetically at 8 nodes 100G out of 1 terabyte is stored on SSD, at 16 nodes 200G out of 1 terabyte is stored at SSD, etc.), so terasorts will be faster as I scale up vs when only HDFS is used.

I continually hit the following problem when I run a teragen against Tachyon

15/03/10 10:00:11 INFO mapreduce.Job: Task Id : attempt_1426006397452_0001_m_000030_0, Status : FAILED
Error: java.io.IOException: FailedToCheckpointException(message:Failed to rename hdfs://apex69:54310/tmp/tachyon/workers/1426006000063/387/515 to hdfs://apex69:54310/tmp/tachyon/data/515)
        at tachyon.worker.WorkerClient.addCheckpoint(WorkerClient.java:116)
        at tachyon.client.TachyonFS.addCheckpoint(TachyonFS.java:183)
        at tachyon.client.FileOutStream.close(FileOutStream.java:104)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:70)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:103)
        at org.apache.hadoop.examples.terasort.TeraOutputFormat$TeraRecordWriter.close(TeraOutputFormat.java:77)
        at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.close(MapTask.java:647)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:394)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: FailedToCheckpointException(message:Failed to rename hdfs://apex69:54310/tmp/tachyon/workers/1426006000063/387/515 to hdfs://apex69:54310/tmp/tachyon/data/515)
        at tachyon.thrift.WorkerService$addCheckpoint_result$addCheckpoint_resultStandardScheme.read(WorkerService.java:3513)
        at tachyon.thrift.WorkerService$addCheckpoint_result$addCheckpoint_resultStandardScheme.read(WorkerService.java:3481)
        at tachyon.thrift.WorkerService$addCheckpoint_result.read(WorkerService.java:3407)
        at tachyon.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
        at tachyon.thrift.WorkerService$Client.recv_addCheckpoint(WorkerService.java:219)
        at tachyon.thrift.WorkerService$Client.addCheckpoint(WorkerService.java:205)
        at tachyon.worker.WorkerClient.addCheckpoint(WorkerClient.java:110)
        ... 13 more


Just for documentation + those who are searching on Google, the reason for these failed checkpoints is that a file is missing.  You'll see messages like this in the namenode logs.

2015-03-09 11:19:53,964 WARN org.apache.hadoop.hdfs.StateChange: DIR* FSDirectory.unprotectedRenameTo: failed to rename /tmp/tachyon/workers/1425925000006/42/59 to /tmp/tachyon/data/59 because source does not exist

Following the block number, you'd see that the block was invalidated, which is why it is missing:

2015-03-09 11:19:52,620 INFO BlockStateChange: BLOCK* addToInvalidates: blk_1073741859_1035 192.168.123.70:50010 192.168.123.72:50010 192.168.123.76:50010

Going to the datanode, you'll find the following exceptions near the scheduled deletes of those blocks  ...

    java.net.SocketException: Original Exception : java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
            at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
            at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
            at org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:223)
            at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:547)
            at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:716)
            at org.apache.hadoop.hdfs.server.datanode.DataNode$DataTransfer.run(DataNode.java:1650)
            at java.lang.Thread.run(Thread.java:682) Caused by: java.io.IOException: Connection reset by peer
            ... 8 more 2015-03-09 11:19:49,833
    WARN org.apache.hadoop.hdfs.server.datanode.DataNode: checkDiskError: exception: java.net.SocketException: Original Exception : java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
            at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
            at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
            at org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:223)
            at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendPacket(BlockSender.java:547)
            at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:716)
            at org.apache.hadoop.hdfs.server.datanode.DataNode$DataTransfer.run(DataNode.java:1650)
            at java.lang.Thread.run(Thread.java:682) Caused by: java.io.IOException: Connection reset by peer
            ... 8 more


So it appears there were socket errors (timeouts?), a block couldn't get replicated properly, so it appears it was invalidated.

So I increase the thread count on the hadoop namenode.  This removed the problem at low node counts and had to be regularly increased as I scaled up, but by itself wasn't enough.

Increasing tachyon.worker.user.timeout.ms also helped, however I can't increase it to too high a level.

Increasing tachyon.worker.checkpoint.threads doesn't seem to help b/c it only exacerbates the problem by hammering the namenode more.

Eventually, setting 64 threads on the namenode and the worker.user.timeout.ms to 30000 got me a teragen without errors once I scaled up to 64 nodes.

I'm curious if this type of configuration has been needed by others.  Or if there is a configuration that I'm just missing or not seeing?  I'm a little concerned on the scalability as I increase my node count more.  Or perhaps this is what everyone does?  Or perhaps I will just have to adjust the number of map tasks/my job in general to take advantage of tachyon?  At some point, more threads and longer timeouts just aren't going to cut it.

I should probably mention that all of my original HDFS settings scale well when it's HDFS only.  So I believe the issue is in fact the increased traffic going on due to tachyon.

As a secondary note, I am actually doing HDFS over Lustre (a parallel filesystem).  Take that for what it's worth, as some reads/writes to disk may be slower than what people are used to.

Thanks,

Al


Haoyuan Li

unread,
Mar 11, 2015, 12:35:59 AM3/11/15
to achu....@gmail.com, tachyo...@googlegroups.com
Al,

Which Tachyon version are you running?

Haoyuan

--
You received this message because you are subscribed to the Google Groups "Tachyon Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to tachyon-user...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Haoyuan Li
AMPLab, EECS, UC Berkeley

achu....@gmail.com

unread,
Mar 11, 2015, 1:56:17 AM3/11/15
to tachyo...@googlegroups.com, achu....@gmail.com
The newest one, 0.6.0.  I read in some prior posts there were fixes in earlier versions for the above rename exception.

Al

Haoyuan Li

unread,
Mar 12, 2015, 3:04:57 PM3/12/15
to achu....@gmail.com, tachyo...@googlegroups.com
I see. This is a timeout issue. It should have been fixed.


Thanks,

Haoyuan

achu....@gmail.com

unread,
Mar 13, 2015, 12:39:43 AM3/13/15
to tachyo...@googlegroups.com, achu....@gmail.com
On Thursday, March 12, 2015 at 12:04:57 PM UTC-7, Haoyuan Li wrote:
I see. This is a timeout issue. It should have been fixed.


Sure, I'll give it a shot and send a pull request.

Al

Haoyuan Li

unread,
Mar 13, 2015, 2:13:30 AM3/13/15
to achu....@gmail.com, tachyo...@googlegroups.com
Merged. Thanks!

Haoyuan

achu....@gmail.com

unread,
Mar 14, 2015, 11:02:45 PM3/14/15
to tachyo...@googlegroups.com, achu....@gmail.com
I think I figured out the problem after looking through the Tachyon code and deciphering some of the log messages.

For those Googling this, I eventually realized that the timeouts were due to heartbeat messages from the WorkerClient (WorkerClientHeartbeatExecutor) were not being sent in a timely fashion.  Inside the Hadoop job syslogs, you can find messages like this:

worker-heartbeat-apex82.llnl.gov/192.168.123.82:29998 last execution took 32433 ms. Longer than  the mFixedExecutionIntervalMs 1000

In the above case, you can see that even though a heartbeat is supposed to be sent every 1000 ms, this particular one took 32433 ms between heartbeats.  This is well past the 10000 ms timeout length.

So, what this means is the thread isn't being scheduled.  As an attempt to fix this, I increased the thread priority of the WorkerClientHeartbeatExecutor thread.  However, that didn't help the situation.

So it was likely the case that some thread was hogging the CPU and not allowing this thread to run.  So what's running?

The logs showed something like this:

2015-03-14 08:38:18,591 INFO [main] : create(tachyon://apex75:19998/terasort-teragen/_temporary/1/_temporary/attempt_1426347380120_0001_m_000002_0/part-m-00002, rw-r--r--, true, 1048576, 1, 33554432, null)

So eventually, after digging into Tachyon code more, I looked into AbstractTFS, and noticed in create()

    if (!CommonConf.get().ASYNC_ENABLED) {
      TachyonURI path = new TachyonURI(Utils.getPathWithoutScheme(cPath));
      if (mTFS.exist(path)) {
        if (!mTFS.delete(path, false)) {
          throw new IOException("Failed to delete existing data " + cPath);
        }
      }
      int fileId = mTFS.createFile(path, blockSize);
      TachyonFile file = mTFS.getFile(fileId);
      file.setUFSConf(getConf());
      return new FSDataOutputStream(file.getOutStream(UserConf.get().DEFAULT_WRITE_TYPE), null);
    }

the AYNC_ENABLED config lead to "getBooleanProperty("tachyon.async.enabled", false)", so we're falling into the if statement.

DEFAULT_WRITE_TYPE lead to "getEnumProperty("tachyon.user.file.writetype.default", WriteType.CACHE_THROUGH);"

well CACHE_THROUGH is ...

  /**
   * Write the file synchronously to the under fs, and also try to cache it,
   */
  CACHE_THROUGH(3),

This sounds like something that could definitely make the thread wait due to blocking IO.

So I changed the default to 

  /**
   * Write the file asynchronously to the under fs (either must cache or must through).
   */
  ASYNC_THROUGH(5);

as an initial test and everything worked.

Eventually moved to putting "-Dtachyon.async.enabled=true -D tachyon.user.file.writetype.default=ASYNC_THROUGH" in mapred.child.java.opts, mapred.map.child.java.opts, and mapred.reduce.child.java.opts to get teragen & terasort working.

It took quite some time to get from "Failed to rename" exceptions all they way to tachyon.user.file.writetype.default, so hope this is useful to other people Googling for answers out there.

I'll try to update the Configuration documentation with values, b/c the current one wasn't clear on what options were available other than CACHE_THROUGH.

Al

Haoyuan Li

unread,
Mar 16, 2015, 2:42:40 PM3/16/15
to achu....@gmail.com, tachyo...@googlegroups.com
Thanks for the updates, Al.

Cheers,

Haoyuan
Reply all
Reply to author
Forward
0 new messages