trouble with current tip

21 views
Skip to first unread message

Roland Gude

unread,
Apr 26, 2012, 9:44:25 AM4/26/12
to peregrine...@googlegroups.com
hi,

while reducing i currently get this message all the time

2012-04-26 15:38:40,151 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.reduce.ReduceRunner Working with 1 readers now.
2012-04-26 15:38:40,151 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.reduce.ReduceRunner Merging on final merge with 1 readers (strategy=finalMerge, pass=1)
2012-04-26 15:38:40,152 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.sysstat.LinuxSystemProfiler Mount point resolved to: mapper/rahn-datap1
2012-04-26 15:38:40,153 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.util.netty.PrefetchReader Running with buffer size: 262,144,000 and per file capacity: 262,144,000
2012-04-26 15:38:40,153 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.reduce.merger.ChunkMerger Merging 1 readers.
2012-04-26 15:38:40,263 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.reduce.ReduceRunner Purging directory /mnt/data/peregrine/rahn/11112/7/tmp/cf-correlate.0 for pass 0
2012-04-26 15:38:40,263 ERROR [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.task.BaseTask Unable to run delegate class com.yoochoose.peregrine.jobs.correlation.CorrelationReducer on partition:00000007 (rahn:11112)
java.io.IOException: Unable to prefetch: /mnt/data/peregrine/rahn/11112/7/tmp/cf-correlate.0/sorted-0.tmp at offset=131,072 and length=131,072 and current capacity 262,144,000
        at peregrine.util.netty.PrefetchReader.handleThrowable(PrefetchReader.java:315)
        at peregrine.util.netty.PrefetchReader.doCache(PrefetchReader.java:282)
        at peregrine.util.netty.PrefetchReader.access$300(PrefetchReader.java:41)
        at peregrine.util.netty.PrefetchReader$PrefetchStreamReaderListener.onRead(PrefetchReader.java:374)
        at peregrine.util.netty.StreamReader.fireOnRead(StreamReader.java:107)
        at peregrine.util.netty.StreamReader.read(StreamReader.java:59)
        at peregrine.util.VarintReader.read1(VarintReader.java:42)
        at peregrine.util.VarintReader.read(VarintReader.java:37)
        at peregrine.io.chunk.DefaultChunkReader.readEntry(DefaultChunkReader.java:241)
        at peregrine.io.chunk.DefaultChunkReader.next(DefaultChunkReader.java:170)
        at peregrine.reduce.merger.MergeQueueEntry.<init>(MergeQueueEntry.java:40)
        at peregrine.reduce.merger.MergerPriorityQueue.<init>(MergerPriorityQueue.java:47)
        at peregrine.reduce.merger.ChunkMerger.merge(ChunkMerger.java:144)
        at peregrine.reduce.merger.ChunkMerger.merge(ChunkMerger.java:123)
        at peregrine.reduce.ReduceRunner.finalMerge(ReduceRunner.java:172)
        at peregrine.reduce.ReduceRunner.reduce(ReduceRunner.java:102)
        at peregrine.task.ReducerTask.doCall(ReducerTask.java:89)
        at peregrine.task.BaseTask.call(BaseTask.java:196)
        at peregrine.task.ReducerTask.call(ReducerTask.java:59)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Unable to mlock:
        at peregrine.os.mman.mlock(mman.java:299)
        at peregrine.util.netty.PrefetchReader.cache(PrefetchReader.java:213)
        at peregrine.util.netty.PrefetchReader.doCache(PrefetchReader.java:280)
        ... 22 more
Caused by: peregrine.os.PlatformException: Cannot allocate memory
        ... 25 more
peregrine.io.util.GroupIOException
        at peregrine.io.util.BaseCloser.exec(BaseCloser.java:84)
        at peregrine.io.util.Closer.close(Closer.java:41)
        at peregrine.reduce.ReduceRunner.finalMerge(ReduceRunner.java:178)
        at peregrine.reduce.ReduceRunner.reduce(ReduceRunner.java:102)
        at peregrine.task.ReducerTask.doCall(ReducerTask.java:89)
        at peregrine.task.BaseTask.call(BaseTask.java:196)
        at peregrine.task.ReducerTask.call(ReducerTask.java:59)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
2012-04-26 15:38:40,263 INFO [peregrine.worker.rpcd.delegate.ReducerRPCDelegate:1] peregrine.task.BaseTask Closing job output: peregrine.io.partition.PartitionWriterJobOutput@70a0afab

the referenced file does not exist, but i don't get what i was doing wrong

burtonator

unread,
Apr 27, 2012, 3:50:28 PM4/27/12
to peregrine...@googlegroups.com
Is it a race issue?  Meaning maybe you read it too soon?  

Could you break your job into steps to see which one isn't writing the file to disk?

I have some more fixes pending but I don't think this one of the issues.

Kevin

Roland Gude

unread,
Apr 30, 2012, 6:52:03 AM4/30/12
to peregrine...@googlegroups.com
I could work around by disabling mlock in the conf.
I don't know how i could write read to soon.

Its a very simple job and this happens only after the reduce phase is finished and a secon mapper runs (which actually only emits the values from the result in order to shuffle them for a next reduce)

If i disable the mlock, this error goes away, but during shuffling the job fails because of the shufflereceiver receiving negative counts ( i added logging statements and found that the count is always positive in the sender but indeed negative every once in a while in the receiver)

burtonator

unread,
May 1, 2012, 7:24:58 PM5/1/12
to peregrine...@googlegroups.com
I realized that the best solution here would be for you to create a test case... then I can take the test case and rip it apart and fix it.

I'm going to be going heads down again here soon so if so I should stumble upon the bug if it's pretty obvious.

Kevin

Roland Gude

unread,
May 2, 2012, 6:30:45 AM5/2/12
to peregrine...@googlegroups.com
great i will try to do that

Roland Gude

unread,
May 3, 2012, 7:51:43 AM5/3/12
to peregrine...@googlegroups.com
Actually any Mapper on a sufficiently long running job leads to this stack trace

2012-05-03 12:22:00,728 ERROR [peregrine.controller.rpcd.ControllerRPCHandler:0] peregrine.rpcd.BaseRPCHandler Unable handle message: ?action=flush
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at peregrine.rpcd.delegate.RPCDelegate.handleMessage(RPCDelegate.java:48)
        at peregrine.worker.rpcd.FSDaemonRPCHandler.handleMessage(FSDaemonRPCHandler.java:75)
        at peregrine.rpcd.BaseRPCHandler$1.doAction(BaseRPCHandler.java:104)
        at peregrine.rpcd.BaseRPCHandler$AsyncMessageHandler.run(BaseRPCHandler.java:144)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)

        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
Caused by: java.io.IOException: Failed to close writer:
        at peregrine.shuffle.receiver.ShuffleReceiver.rollover(ShuffleReceiver.java:116)
        at peregrine.shuffle.receiver.ShuffleReceiver.close(ShuffleReceiver.java:132)
        at peregrine.shuffle.receiver.ShuffleReceiverFactory.flush(ShuffleReceiverFactory.java:81)
        at peregrine.worker.rpcd.delegate.ShufflerRPCDelegate.flush(ShufflerRPCDelegate.java:37)
        ... 14 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: count < 0
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
        at java.util.concurrent.FutureTask.get(FutureTask.java:83)
        at peregrine.shuffle.receiver.ShuffleReceiver.rollover(ShuffleReceiver.java:114)
        ... 17 more
Caused by: java.io.IOException: count < 0
        at peregrine.shuffle.ShuffleOutputWriter.buildLookup(ShuffleOutputWriter.java:142)
        at peregrine.shuffle.ShuffleOutputWriter.close(ShuffleOutputWriter.java:164)
        at peregrine.shuffle.receiver.ShuffleReceiverFlushCallable.call(ShuffleReceiverFlushCallable.java:37)
        ... 5 more


Maybe related is this Debug message a little while before the error:

2012-05-03 13:47:17,831 DEBUG [peregrine.worker.rpcd.delegate.MapperRPCDelegate:0] org.apache.hadoop.conf.Configuration java.io.IOException: config(config)
        at org.apache.hadoop.conf.Configuration.<init>(Configuration.java:259)
        at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:341)
        at org.apache.hadoop.mapreduce.JobContext.<init>(JobContext.java:76)
        at org.apache.hadoop.mapreduce.TaskAttemptContext.<init>(TaskAttemptContext.java:35)
        at peregrine.io.driver.cassandra.CassandraJobInput.<init>(CassandraJobInput.java:60)
        at peregrine.io.driver.cassandra.CassandraIODriver.getJobInput(CassandraIODriver.java:96)
        at peregrine.task.BaseMapperTask.getJobInput(BaseMapperTask.java:119)
        at peregrine.task.MapperTask.doCall(MapperTask.java:48)
        at peregrine.task.BaseTask.call(BaseTask.java:196)

        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)

Roland Gude

unread,
May 3, 2012, 11:25:24 AM5/3/12
to peregrine...@googlegroups.com
I isolated the issue further

apparently the count goes wrong on shuffling if the size of a single key/value pair is larger then MAX_HTTP_CHUNK_SIZE
which made the error go away (once i told the bytebuffercloser not to fail if it did not receive a directbuffer) - but unfortunately now chunkSorter throws something similar:

ERROR [main] peregrine.task.Scheduler Failed to handle task: rahn:11112:[partition:00000007]
 java.io.IOException: Unable to sort [peregrine.shuffle.ShuffleInputChunkReader:/mnt/data/peregrine/rahn/11112/tmp/shuffle/log/0000000000.tmp:partition:00000007, peregrine.shuffle.ShuffleInputChunkReader:/mnt/data/peregrine/rahn/11112/tmp/shuffle/log/0000000001.tmp:partition:00000007] for partition:00000007
        at peregrine.reduce.sorter.ChunkSorter.sort(ChunkSorter.java:128)
        at peregrine.reduce.sorter.ChunkSorter.sort(ChunkSorter.java:67)
        at peregrine.reduce.ReduceRunner.sort(ReduceRunner.java:409)
        at peregrine.reduce.ReduceRunner.reduce(ReduceRunner.java:90)

        at peregrine.task.ReducerTask.doCall(ReducerTask.java:89)
        at peregrine.task.BaseTask.call(BaseTask.java:196)
        at peregrine.task.ReducerTask.call(ReducerTask.java:59)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: java.lang.IllegalArgumentException: bad count -1
        at peregrine.shuffle.ShuffleInputChunkReader.assertPrefetchReaderNotFailed(ShuffleInputChunkReader.java:175)
        at peregrine.shuffle.ShuffleInputChunkReader.hasNext(ShuffleInputChunkReader.java:119)
        at peregrine.io.chunk.CompositeChunkReader.hasNext(CompositeChunkReader.java:105)
        at peregrine.reduce.sorter.KeyLookup.<init>(KeyLookup.java:99)
        at peregrine.reduce.sorter.ChunkSorter.sort(ChunkSorter.java:93)
        ... 11 more
Caused by: java.lang.IllegalArgumentException: bad count -1
        at peregrine.shuffle.ShufflePacket.<init>(ShufflePacket.java:40)
        at peregrine.shuffle.ShuffleInputReader.next(ShuffleInputReader.java:204)
        at peregrine.shuffle.ShuffleInputChunkReader$PrefetchReader.call(ShuffleInputChunkReader.java:362)
        ... 5 more


The IllegalArgumentException was added by me in order to investigate the previous error. I am not sure whther it is still an issue in the reducerunner. i will let you know if i find something.

burtonator

unread,
May 3, 2012, 3:11:53 PM5/3/12
to peregrine...@googlegroups.com
Ah... ok.  That is not good.  We should create a bug for that.  

This is something I realized earlier on and knew would be a problem but most of my use case is around small key/value pairs.

I'm going to need to figure out a solution for this.

Writing a test case would be easy.

burtonator

unread,
May 3, 2012, 3:14:19 PM5/3/12
to peregrine...@googlegroups.com
Ah... ok. I'm seeing similar things on my end.

I think one of the issues is that the integration suite is executing on SMALLER amounts of data.

I think I'm going to update this so that tests take longer but process much more data.

Further, I can easily tell it to run page rank over say 1GB-100GB of data for one off aggressive tests without a difficult to setup testing environment.

The new refractor I did enables this... now we basically run tests JUST like they are executed in production.
Reply all
Reply to author
Forward
0 new messages