Druid Batch Ingestion (type="index_hadoop") running in local mode - Increase memory settings?

1,195 views
Skip to first unread message

Mark

unread,
Jun 9, 2016, 4:17:03 PM6/9/16
to Druid User
I am current running my Druid (0.8.3) Batch Ingestion in "local" mode due to library incompatibilities with the Hadoop Distribution I am using  (Hortanworks Ambari HDP 2.4.0 - Hadoop 2.7.1 ish).

Unfortunately, I am getting the following OutOfMemoryError exception due to the larger amount of data I am trying to index.  How do I increase the memory settings for this task?


2016-06-08T01:42:07,991 ERROR [task-runner-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_datasource01_2016-06-07T23:18:03.684Z, type=
index_hadoop, dataSource=datasource01}]
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
    at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
    at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:138) ~[druid-indexing-service-0.8.3.jar:0.8.3]
    at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:206) ~[druid-indexing-service-0.8.3.jar:0.8.3]
    at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:285) [druid-indexing-service-0.8.3.jar:0.8.3]
    at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:265) [druid-indexing-service-0.8.3.jar:0.8.3]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_71]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_71]
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_71]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_71]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_71]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_71]
    at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:135) ~[druid-indexing-service-0.8.3.jar:0.8.3]
    ... 7 more
Caused by: java.lang.OutOfMemoryError: Java heap space



Note:  In this case I am ingesting 2.7GB of csv data, and this process which utilizes a local running directory (/tmp/hadoop-<username>) that grows to around 18GB at it's peak.


Background

Remote Hadoop Cluster ( Specifically Map/Reduce ) Issues:

Mark

unread,
Jun 9, 2016, 5:18:39 PM6/9/16
to Druid User
Could the answer be as simple as updating the Java -Xmx Option for the following Druid property?

druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC

Mark

unread,
Jun 10, 2016, 12:10:07 PM6/10/16
to Druid User
Would setting the job properties of my Druid Batch Ingestion task be a way to achieve increased memory settings?

Foe example.

"tuningConfig": {
  "type": "hadoop",
  "partitionsSpec": {
    "targetPartitionSize": 0
  },
  "numBackgroundPersistThreads" : 1,
  "jobProperties": {
    "mapreduce.map.memory.mb": 2048,
    "mapreduce.map.java.opts": "-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8",
    "mapreduce.reduce.memory.mb": 6144,
    "mapreduce.reduce.java.opts":"-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8"
  }
}


A couple of links that suggest this could work:


On Thursday, 9 June 2016 16:17:03 UTC-4, Mark wrote:

Mark

unread,
Jun 10, 2016, 5:34:03 PM6/10/16
to Druid User
This is what I have discovered.

I noticed that if I set the Druid logging level to INFO, and viewed the running Druid task logs, a map reduce property was logged ("mapreduce.task.io.sort.mb").  By setting the relevant configuration values and executing a simple Druid Batch Ingestion task, this logging output allowed me to easily discover if my Hadoop property override was successful.

Druid Batch Ingestion Task - Local MapReduce Default Value:

2016-06-10T20:16:52,308 INFO [LocalJobRunner Map Task Executor #0] org.apache.hadoop.mapred.MapTask - mapreduce.task.io.sort.mb: 100


Basically there are a couple of different levels of precedence.  If the three values given below are set, the "jobProperties" value will be chosen if it exists, than the "druid.indexer.fork.property" value if it exists, then the "druid.indexer.running.javaOpts" value if it exists, otherwise the default value will be used.

Level 1 (Highest):  In the Druid Batch Ingestion task:

    "tuningConfig": {
        "type": "hadoop",
        "partitionsSpec": {
          "targetPartitionSize": 0
        },
        "numBackgroundPersistThreads" : 1,
        "useCombiner": true,
        "jobProperties": {
          "mapreduce.task.io.sort.mb": 201
        }
    }

Level 2: In the Druid configuration (druid/config/_common/common.runtime.properties file):

druid.indexer.fork.property.hadoop.mapreduce.task.io.sort.mb=202

Level 3: In the Druid configuration (druid/config/_common/common.runtime.properties file):

druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -Dhadoop.mapreduce.task.io.sort.mb=203





On Thursday, 9 June 2016 16:17:03 UTC-4, Mark wrote:

Fangjin Yang

unread,
Jun 16, 2016, 8:27:49 PM6/16/16
to Druid User
Druid hadoop based ingestion simply sends a job to a Hadoop cluster. The bottleneck is likely on the hadoop cluster. Local hadoop ingestion isn't really designed to handle ingesting a large volume of data and a remote hadoop cluster is required. What is the error you are seeing with HDP?

Mark

unread,
Jun 24, 2016, 11:31:43 AM6/24/16
to Druid User
For now the "java.lang.OutOfMemoryError: Java heap space" exception I was receiving seems to have been addressed with the following configuration.  I used http://druid.io/docs/latest/configuration/hadoop.html for inspiration.


"tuningConfig" : {
"type" : "hadoop",
"partitionsSpec" : {
"targetPartitionSize" : 0
},
"numBackgroundPersistThreads" : 1,
"useCombiner" : true,
"jobProperties" : {
"mapreduce.map.memory.mb" : 2048,
"mapreduce.map.java.opts" : "-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8",
"mapreduce.reduce.memory.mb" : 6144,
"mapreduce.reduce.java.opts" : "-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8",

"mapreduce.job.reduces" : 21,
"mapreduce.job.jvm.numtasks" : 20,
"mapreduce.reduce.shuffle.parallelcopies" : 50,
"mapreduce.reduce.shuffle.input.buffer.percent" : 0.5,
"mapreduce.task.io.sort.mb" : 250,
"mapreduce.task.io.sort.factor" : 100,
"mapreduce.jobtracker.handler.count" : 64,
"mapreduce.tasktracker.http.threads" : 20,

"mapreduce.output.fileoutputformat.compress" : false,
"mapreduce.output.fileoutputformat.compress.type" : "BLOCK",
"mapreduce.output.fileoutputformat.compress.codec" : "org.apache.hadoop.io.compress.DefaultCodec",
"mapreduce.map.output.compress" : true,
"mapreduce.map.output.compress.codec" : "org.apache.hadoop.io.compress.DefaultCodec",
"mapreduce.map.speculative" : false,
"mapreduce.reduce.speculative" : false,
"mapreduce.task.timeout" : 1800000
}
}



Fangjin Yang

unread,
Jun 26, 2016, 8:48:09 PM6/26/16
to Druid User
Answers duplicated with https://groups.google.com/forum/#!topic/druid-user/SFYlum_wu38.

Do not use local hadoop ingestion for anything beyond a small POC data set and expect good performance.
Reply all
Reply to author
Forward
0 new messages