Compressed output corrupted by map job

59 views
Skip to first unread message

Rory Douglas

unread,
Jan 14, 2015, 10:36:00 AM1/14/15
to hydr...@googlegroups.com
I have a simple map job (let's call it Job A) that does minimal transformation - its primary purpose is to multiplex the output of a Hoover job into a smaller set of files (for DR/archiving purposes).

I'm encountering issues with the downstream consumption job (Job B) where the output of Job A appears to be corrupted. I've tried both Snappy and LZF compression and encountered issues with both:

For Snappy: an IOException: failed to uncompress the chunk: PARSING_ERROR(2)

For LZF: an IOException on decodeBundle: type mismatch (86 = null) != valid Bundle type

In both cases the upstream job (Job A) didn't throw any exceptions or log any issues when writing the outputs. Note, the problems only occur for some of the output files.

Any advice on how to debug this? Should I be avoiding the alternate compression types & stick with GZIP (that's the next thing on my list)? Are there any settings I that may affect/trigger these problems?

Right now I have the following set (these were added in an attempt to counteract OOM errors in the multiplex job - a discussion for another post)

// -Dtask.queue.depth=2
// -DdataSourceMeshy2.buffer=10
// -DdataSourceMeshy2.workers=1

We're using 4.2.11 (the version Albert mentioned in his production deployment post).

Thanks for any help!

Ian Barfield

unread,
Jan 14, 2015, 5:23:45 PM1/14/15
to Rory Douglas, hydr...@googlegroups.com
First I'd make sure the files themselves are actually corrupted and not something on the transport layer. Are the errors easily reproduced for the same files? I know with GZIP you can do `gzip -t` on the command line, so you might also want to see if the others have equivalents.

I wouldn't expect those system properties to affect much other than maybe throttling job speed (outside of memory usage and then only if you have super giant bundles).


--
You received this message because you are subscribed to the Google Groups "hydra-oss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to hydra-oss+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ian Barfield

unread,
Jan 15, 2015, 8:08:01 PM1/15/15
to Rory Douglas, Rory Douglas, hydr...@googlegroups.com
several MB compressed probably counts as "super giant". I can see why you are lowering buffer sizes that use bundle counts. Those are all such buffers that come to mind, but there are ways to control how many are created. For example, there is one of those 2-depth queues per task thread, and one of those 10-depth queues per mesh source (if using an aggregate source or something).

Normally I would mostly control memory usage via node/page cache allowances in a tree output. Your version is a little old, so maybe mux is still a little temperamental? I usually find it easiest to just take a look at the heap usage in these cases.

On Thu, Jan 15, 2015 at 6:49 PM, Rory Douglas <rory.d...@newbrandanalytics.com> wrote:
Thanks Ian!  I tried setting up parallel job chains (one without multiplex), since I don't have a quick way to pull files out of muxy directories.  However, now I can't reproduce the issue (in either  multiplexed or non-multiplexed scenarios).  I'll see if I can re-create it & report back.

To your point about the task.queue.depth/buffers/workers tweaks, we are actively trying to avoid OOM errors that occur in this archiving job.  I'm not sure what the definition of "super giant bundles" is though? We have bundles ranging from 1K to a few MB (gzipped size) - at least at this archive step where each input file to the multiplexing job contains a single-line JSON payload that we haven't parsed yet.

So far, the only way I've found to prevent OOM errors occurring during this job step is to effectively disabling buffering & output bundle cache (the params I mentioned previously), plus maxBundles: 1, bufferSizeRatio: 1 in the output section.  I've also enabled waitForDiskFlushThread.  Are there any other levers to pull to control job memory usage?

Rory Douglas

unread,
Jan 16, 2015, 7:45:19 PM1/16/15
to Ian Barfield, Rory Douglas, hydr...@googlegroups.com
Thanks Ian!  I tried setting up parallel job chains (one without multiplex), since I don't have a quick way to pull files out of muxy directories.  However, now I can't reproduce the issue (in either  multiplexed or non-multiplexed scenarios).  I'll see if I can re-create it & report back.

To your point about the task.queue.depth/buffers/workers tweaks, we are actively trying to avoid OOM errors that occur in this archiving job.  I'm not sure what the definition of "super giant bundles" is though? We have bundles ranging from 1K to a few MB (gzipped size) - at least at this archive step where each input file to the multiplexing job contains a single-line JSON payload that we haven't parsed yet.

So far, the only way I've found to prevent OOM errors occurring during this job step is to effectively disabling buffering & output bundle cache (the params I mentioned previously), plus maxBundles: 1, bufferSizeRatio: 1 in the output section.  I've also enabled waitForDiskFlushThread.  Are there any other levers to pull to control job memory usage?
On Wed, Jan 14, 2015 at 5:23 PM, Ian Barfield <i...@addthis.com> wrote:

kui....@newbrandanalytics.com

unread,
Feb 19, 2015, 3:20:08 PM2/19/15
to hydr...@googlegroups.com, rory.d...@newbrandanalytics.com, rory1d...@gmail.com
Ian, this is Kui. I also work with Rory on the same archive job mentioned in this thread.

We managed to make the job work on a lower env with 1/10 data from production. It works fine there. However, when we moved the job to prod env we run into OOM issue. It looks like the tweaks we did for lower env doesn't work for a more intensive environment (i.e. more files-- 150k vs 3M). So I am back here and seeking help.

The job carries below settings. It works with lower env.
task.queue.depth=1
dataSourceMeshy2.buffer=1
dataSourceMeshy2.workers=1
and
diskFlushThreads: 1,
maxBundles: 1,
bufferSizeRatio: 1,
waitForDiskFlushThread: true,

When moving to prod env, we run into this OOM when using Snappy compression using muxy v2.0.12:
Exception in thread "OutputWriterDiskFlushThread-0" java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:434)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:179)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:168)
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:277)
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:108)
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:811)
at io.netty.buffer.ByteBufOutputStream.write(ByteBufOutputStream.java:66)
at com.addthis.muxy.MuxStreamDirectory$StreamOut.write(MuxStreamDirectory.java:434)
at com.addthis.muxy.MuxStreamDirectory$StreamOutWriter.write(MuxStreamDirectory.java:489)
at com.addthis.muxy.MuxFileDirectory$StreamsWriter.write(MuxFileDirectory.java:403)
at org.xerial.snappy.SnappyOutputStream.dump(SnappyOutputStream.java:297)
at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:244)
at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:99)
at java.io.OutputStream.write(OutputStream.java:75)
at com.addthis.hydra.task.output.DefaultOutputWrapper.write(DefaultOutputWrapper.java:212)
at com.addthis.hydra.task.output.OutputWriter.dequeueWrite(OutputWriter.java:184)
at com.addthis.hydra.task.output.AbstractOutputWriter$DiskFlushThread.run(AbstractOutputWriter.java:346)


Since it reported as an OOM: Direct buffer memory, I noticed hydra uses netty's (v4.0.19) direct memory buffer here. And the direct memory buffers looks like it is being managed by hydra. Thus we tried to increase DirectMemory size from default value to
// -XX:MaxDirectMemorySize=768m
I hooked up a java debugger to the task VM and confirmed at the OOM point we did have assigned 768Mb set for direct memory buffer. However, and it still got OOM.

I noticed on your reply you mentioned to control node/page numbers in a tree job. However this is a "file" but not "tree" output job. You also mentioned monitoring heap usage. I am not so sure what heap you meant however I hooked up JConsole and have not observed any surging/exceeding on heap/no heap usage.

it looks like we had minimized the footprints as much as we can and increased the provision of direct memory. Can you shed some lights on this issue, please?

Btw, we will meet up at AddThis on next Tuesday. I am bringing some of my favorite teas for you 'Tea Dragon'. :-)

Ian Barfield

unread,
Feb 20, 2015, 12:39:14 PM2/20/15
to kui....@newbrandanalytics.com, hydr...@googlegroups.com, Rory Douglas, Rory Douglas
Since this is direct memory, most of those settings probably don't affect this error. Guessing this is all muxy / file output settings. That version of muxy is surprisingly recent, and I think it includes some changes I made a while back that were supposed to work around some short comings in the file output, so might be able to resolve with just muxy settings.

the most salient options are probably (with our default settings pulled from our config file):
# in MB, the target buffer size per directory
muxy.default.block.size: 100
# max aggregate buffer size across all (active) directories
muxy.cache.bytes.max: 157286400
# target cached directory count (can't evict active directories though)
muxy.cache.dir.max: 5

The in-code defaults (in master anyway) look like 5 for the block size and double that for the cache bytes max, but these might have be set somewhere as system properties or as certain nested fields in the output config.

Other possibilities that spring to mind:
- that version of netty might be one with a double-memory usage bug I later fixed. upgrading that version should be pretty safe
- even without a tree output, the marks db in many source types still use the same page cache so if those settings are way off for some reason, I guess it could still have an impact. At some point we started using pooled buffers in the tree/page cache in some places, so that could theoretically be interesting if the other things don't pan out.

Kui Zhang

unread,
Feb 20, 2015, 10:23:14 PM2/20/15
to Ian Barfield, hydr...@googlegroups.com, Rory Douglas, Rory Douglas, Albert Law
Ian, thank you so much for the prompt response. 

We tweaked the settings a bit per as your advices as below,
    // # in MB, the target buffer size per directory (defult: 100)
    -Dmuxy.default.block.size=20
    // # max aggregate buffer size across all (active) directories (default: 157286400)
    -Dmuxy.cache.bytes.max=5242880
    // # target cached directory count (can't evict active directories though) (default: 5˜)
    -Dmuxy.cache.dir.max=3

It seems working now. We are going to double check it. Meanwhile can you take a look at the settings we have now and see if they look good?

Kui Zhang

Senior Software Developer | NewBrand
703.867.4229

Ian Barfield

unread,
Feb 20, 2015, 10:41:20 PM2/20/15
to Kui Zhang, Rory Douglas, Rory Douglas, Albert Law, hydr...@googlegroups.com

It might not make sense to have a 20MB block size and a 5MB total buffer allowance. Also need to be careful when lowering these values as they can dramatically increase the stream fragmentation on disk (impacting later read efficiency).

In theory, 150MB max cache size should be well under the amount in your OOM example, so there is probably another thing going on. That said, if your read speeds seem fine for your needs, then go nuts I guess.

Kui Zhang

unread,
Feb 20, 2015, 10:44:41 PM2/20/15
to Ian Barfield, Rory Douglas, Rory Douglas, Albert Law, hydr...@googlegroups.com
Thanks. We will give a try to the more aggressive settings as you suggested.

Kui Zhang

unread,
Feb 21, 2015, 9:14:42 PM2/21/15
to Ian Barfield, Rory Douglas, Rory Douglas, Albert Law, hydr...@googlegroups.com
Ian, this is what I can get before the job raises OOM,
    // # in MB, the target buffer size per directory (defult: 100)
    // -Dmuxy.default.block.size=100
    // # max aggregate buffer size across all (active) directories (default: 157286400)
    // -Dmuxy.cache.bytes.max=5242880
    // # target cached directory count (can't evict active directories though) (default: 5˜)
    // -Dmuxy.cache.dir.max=5

Seems 
 -Dmuxy.cache.bytes.max=5242880
is still the best thing we can get. Would this impact the performances a lot?

Ian Barfield

unread,
Feb 23, 2015, 10:08:34 AM2/23/15
to Kui Zhang, Rory Douglas, Rory Douglas, hydr...@googlegroups.com, Albert Law

Well, it depends how many files you're writing simultaneously per directory and what your hard drive setup is. If the former is single digits or the latter is "all ssds", then you're probably fine. If not, then it I guess it scales along those lines up to where "100 files per directory at once" means probably not fine.

Having to stop at 5MB makes me think your writes are some strange cases like a bunch of directories per task and bursty writes or something. Newer versions of muxy / netty might be better at handling those. On the other hand, if you're writing to a lot of directories, maybe you aren't writing a lot of files per directory -- if your shards each have their own directory or something, that could easily be the case.

Reply all
Reply to author
Forward
0 new messages