Out of memory And number of shards exceed limit

675 views
Skip to first unread message

hmx...@gmail.com

unread,
Jul 3, 2014, 8:00:06 PM7/3/14
to druid-de...@googlegroups.com
I have been trying to use index 1.0G bz2 files with HadoopDruidIndexer but couldn't make it work.

These are the options I've tried:

1). With
  "partitionsSpec" : {
    "type" : "hashed",
    "targetPartitionSize" : 1000000,
    "maxPartitionSize" : 1500000,
    "assumeGrouped" : true
  },

I am getting Out of Memory error even after I changed all memory parameters the maximum values allowed.
Error: java.io.IOException: Map failed
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:849)
        at com.google.common.io.Files.map(Files.java:680)
        at com.google.common.io.Files.map(Files.java:666)
        at com.google.common.io.Files.map(Files.java:635)
        at com.google.common.io.Files.map(Files.java:609)
        at com.metamx.common.io.smoosh.SmooshedFileMapper.mapFile(SmooshedFileMapper.java:124)
        at io.druid.segment.IndexIO$DefaultIndexIOHandler.convertV8toV9(IndexIO.java:350)
        at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:843)
        at io.druid.segment.IndexMerger.merge(IndexMerger.java:306)
        at io.druid.segment.IndexMerger.mergeQueryableIndex(IndexMerger.java:168)
        at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:372)
        at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:247)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:636)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:396)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:158)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1300)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:153)
Caused by: java.lang.OutOfMemoryError: Map failed
        at sun.nio.ch.FileChannelImpl.map0(Native Method)
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:846)


2. with
  "partitionsSpec" : {
    "type" : "hashed",
    "targetPartitionSize" : 300000,
    "maxPartitionSize" : 450000,
    "assumeGrouped" : true
  },

I am getting  "Number of shards [329] exceed the maximum limit of [128],"

        File System Counters
                FILE: Number of bytes read=212872
                FILE: Number of bytes written=79740927
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=1210690603
                HDFS: Number of bytes written=18808
                HDFS: Number of read operations=1201
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=401
        Job Counters
                Killed map tasks=3
                Launched map tasks=203
                Launched reduce tasks=200
                Other local map tasks=42
                Data-local map tasks=101
                Rack-local map tasks=60
                Total time spent by all maps in occupied slots (ms)=213572352
                Total time spent by all reduces in occupied slots (ms)=125947408
        Map-Reduce Framework
                Map input records=106487384
                Map output records=200
                Merged Map outputs=40000
                GC time elapsed (ms)=134680
                CPU time spent (ms)=13432860
                Physical memory (bytes) snapshot=189270577152
                Virtual memory (bytes) snapshot=1439910318080
                Total committed heap usage (bytes)=182140207104
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=1210652603
        File Output Format Counters
                Bytes Written=18800

2014-07-03 23:36:35,527 INFO [main] io.druid.indexer.DetermineHashedPartitionsJob - Job completed, loading up partitions for intervals[Optional.of([2014-07-01T08:00:00.000Z/2014-07-01T09:00:00.000Z])].
2014-07-03 23:36:35,853 ERROR [main] io.druid.cli.CliHadoopIndexer - failure!!!!
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at io.druid.cli.CliHadoopIndexer.run(CliHadoopIndexer.java:104)
        at io.druid.cli.Main.main(Main.java:92)
Caused by: com.metamx.common.ISE: Number of shards [329] exceed the maximum limit of [128], either targetPartitionSize is too low or data volume is too high


3. "partitionsSpec" : {
    "type" : "hashed",
    "numShards": 200
  },

Then it will only have one reducer and the process will run forever.

Could somebody suggest what would be the right way to go?

Thanks.

Deepak Jain

unread,
Jul 3, 2014, 11:54:48 PM7/3/14
to druid-de...@googlegroups.com
"partitionsSpec": {
            "type": "hashed",
            "targetPartitionSize": 3000000
        },
and
"jobProperties": {
    "mapreduce.input.fileinputformat.split.maxsize":"60000000",
            "mapreduce.map.memory.mb": "3072",
            "mapreduce.map.java.opts": "-server -Xmx3g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xprof",
            "io.sort.record.percent": "0.08",
            "io.sort.mb": "1750",
            "io.sort.spill.percent": "1.0",
    "mapreduce.map.output.compress":"true",
    "mapreduce.map.output.compress.codec":"org.apache.hadoop.io.compress.Lz4Codec",  
            "mapreduce.reduce.memory.mb": "12288",
            "mapreduce.reduce.java.opts": "-server -Xmx12g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Xprof",

    "tasktracker.http.threads":"100",
    "mapreduce.reduce.shuffle.parallelcopies":"8",
            "mapreduce.reduce.merge.memtomem.enabled":"true",
            "mapreduce.reduce.shuffle.input.buffer.percent":"1.0",
            "mapreduce.reduce.shuffle.merge.percent":"1.0",
            "mapreduce.reduce.shuffle.memory.limit.percent":"0.9",
            "mapreduce.reduce.memory.totalbytes":"6442450944",
            "mapreduce.reduce.merge.inmem.threshold":"0",
            "io.sort.factor":"50"
        },

I use the above config to ingest 1.2 billion rows (2047 gziped files, each wof size 26MB). This starts multiple mappers and reducers and completes in 6 hours. My dataset has 50+ dimensions and 30+aggregations.

1) Can you remove maxPartitionSize and assumeGrouped ?
2) Can you share values for mapreduce.map.memory.mb and mapreduce.map.java.opts ?
3) Please share the amount of time taken by mappers/reducers and entire job.

-Deepak

Deepak Jain

unread,
Jul 3, 2014, 11:55:47 PM7/3/14
to druid-de...@googlegroups.com
Can you use ingestion service instead of HadoopDruidIndexer ? With it you can submit jobs using its REST interface. (Production environment)

hmx...@gmail.com

unread,
Jul 4, 2014, 12:13:36 AM7/4/14
to druid-de...@googlegroups.com
Thanks for the reply.

Here are the parameters I am using:

  <property>
    <name>mapred.child.java.opts</name>
    <value>-server -Xmx3072m -Djava.net.preferIPv4Stack=true</value>
    <description>No description</description>
  </property>

  <property>
    <name>mapreduce.map.memory.mb</name>
    <value>8192</value>
  </property>

  <property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>8192</value>
  </property>

  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-server -Xmx3072m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
  </property>

  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-server -Xmx3072m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
  </property>


I got error when I tried to set mapreduce.reduce.java.opts over 3G.
Let me give it a try with
 "mapreduce.reduce.memory.totalbytes":"6442450944",
 "mapreduce.reduce.memory.mb": "12288"

to see how it goes.

Nishant Bangarwa

unread,
Jul 4, 2014, 12:25:22 AM7/4/14
to druid-de...@googlegroups.com
Hi, 

Caused by: com.metamx.common.ISE: Number of shards [329] exceed the maximum limit of [128], either targetPartitionSize is too low or data volume is too high

Above exception you were getting is due to a limit on the max no. of shards to 128. We removed this limit a while ago,
which version of druid are you using ? 
can you try with the latest stable 0.6.121, that should not throw this error.  



--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/0ec6447c-48ff-43d9-bff4-a75b374b62d5%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--

hmx...@gmail.com

unread,
Jul 4, 2014, 12:28:23 AM7/4/14
to druid-de...@googlegroups.com
I am still using 0.6.72

Nishant Bangarwa

unread,
Jul 4, 2014, 1:31:42 AM7/4/14
to druid-de...@googlegroups.com
IIRC, numShards in partitionsSpec was introduced after 0.6.72, 
try using the latest stable 0.6.121. 



For more options, visit https://groups.google.com/d/optout.

hmx...@gmail.com

unread,
Jul 4, 2014, 2:23:00 AM7/4/14
to druid-de...@googlegroups.com
Don't have the luxury to set the values that high.

Out cluster limits map/reduce.memory.mb to 8g and java.opts -Xmx to 3G.

Guess I have to try the new version without shard limit.


On Thursday, July 3, 2014 8:54:48 PM UTC-7, Deepak Jain wrote:

Fangjin Yang

unread,
Jul 4, 2014, 1:17:42 PM7/4/14
to druid-de...@googlegroups.com
It is also interesting that you are generating so many shards. How many of these 1.0GB files do you have? What is your rollup/segment granularity set to? I wonder of the possible performance impacts of having so many shards for a given time range.

hmx...@gmail.com

unread,
Jul 4, 2014, 1:34:45 PM7/4/14
to druid-de...@googlegroups.com
I have 200 of them and each one is 6MB.

These are an hourly aggregated data for one hour and I have 
   "rollupGranularity": "hour"

Fangjin Yang

unread,
Jul 6, 2014, 12:27:29 AM7/6/14
to druid-de...@googlegroups.com
Did the new version help with your problem? I am also very curious about the segment sizes you've generated once indexing actually completes. With your setup and data volume, you shouldn't need so many shards and your segment sizes in general are very small, which will impact performance quite a bit. I wonder, if you merge all 200 files into a single file and reran the indexing again with a larger target partition size, do you still see the same out of memory problems?

hmx...@gmail.com

unread,
Jul 6, 2014, 1:55:08 AM7/6/14
to druid-de...@googlegroups.com
I am still with the old version. I managed to raise the java.opts Xmx to 7G and the load finally worked.

Now I have 116 partitions and each has a size of 90Mb (total 10.5G) while the source data only 1.2G bz2.

Are these sizes too small for query?

Fangjin Yang

unread,
Jul 6, 2014, 1:57:37 AM7/6/14
to druid-de...@googlegroups.com
For your data size, I suspect you only need 2 partitions. We typically try to target our segments to 500mb to 1G. Can you share your segment granularity? Can you also try to bump up your target partition size to something like 5M rows?


--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.

hmx...@gmail.com

unread,
Jul 6, 2014, 2:21:08 AM7/6/14
to druid-de...@googlegroups.com
This is hourly data.

I have another hour where the source data is 4.0G in bz2. I used "targetPartitionSize" : 4200000

It ends with 76 partitions and each has a size of 470M, 33G in total.

I figure that the bz2 file size and the total segment size is 1:8.

Let me try to reload the 1.2G hour with 5M partition size to see how many partitions it will generate but my estimation would be around 20.

hmx...@gmail.com

unread,
Jul 6, 2014, 4:03:06 AM7/6/14
to druid-de...@googlegroups.com


As expected, it has 20 partitions with size 500M each.

Fangjin Yang

unread,
Jul 7, 2014, 1:22:05 PM7/7/14
to druid-de...@googlegroups.com
Those sizes seem much more reasonable.
Reply all
Reply to author
Forward
0 new messages