I've been successfully able to setup Druid and Spec file in order to execute tasks on a remote Hadoop cluster. The latter has been built using Google Dataproc.
The issue is that tasks are executed successfully if the amount of input data specified in inputSpec field of the Spec file is relatively small, but the tasks start to fail as more data is specified in input, with the following error:
2017-08-29T10:20:19,102 INFO [main] org.apache.hadoop.mapreduce. Job - map 100% reduce 98%
2017-08-29T10:22:46,879 INFO [main] org.apache.hadoop.mapreduce. Job - map 100% reduce 99%
2017-08-29T10:25:37,504 INFO [main] org.apache.hadoop.mapreduce. Job - map 100% reduce 100%
2017-08-29T10:25:44,537 INFO [main] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1503562969086_0031_m_000168_0, Status : FAILED
Container [pid=26154,containerID=container_1503562969086_0031_01_000176]
is running beyond physical memory limits. Current usage: 6.0 GB of 6 GB
physical memory used; 8.2 GB of 12.6 GB virtual memory used. Killing
container.
[...]
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143
The failure happens at the end of the reduce task, so I assume that the issue is related to memory usage during reduce.
My first thought was to check memory settings for Java containers in the Hadoop cluster, in particular mapreduce.map.memory.mb, that should be the amount of memory to request from the scheduler for each
map task. Indeed, this setting is 6GB, as the limit specified in the error.
I'm reporting below the settings of the Hadoop cluster to handle mapreduce, as configured by Google Dataproc:
mapreduce.job.maps: 40
mapreduce.map.cpu.vcores: 1
mapreduce.map.memory.mb: 3072
mapreduce.map.java.opts: -Xmx2457m
mapreduce.job.reduces: 16
mapreduce.reduce.cpu.vcores: 2
mapreduce.reduce.memory.mb: 6144
mapreduce.reduce.java.opts: -Xmx2560m
mapreduce.tasktracker.map.tasks.maximum: 4
mapreduce.input.fileinputformat.list-status.num-threads: 20
mapreduce.framework.name: yarn
yarn.app.mapreduce.am.command-opts: -Xmx4915m
yarn.app.mapreduce.am.resource.cpu-vcores: 2
yarn.app.mapreduce.am.resource.mb: 6144
Taking a look on the various discussions in the web, people suggest in such cases to increase the physical memory of the containers, i.e. mapreduce.reduce.memory.mb and mapreduce.reduce.memory.mb. Anyway, the task continues to fail with the same error, just reporting that the Java container requires more memory than what has been configured at least.
My second approach was to assume that, actually, the Hadoop cluster is properly configured by Google Dataproc (in fact, increase the configured memory for the container does not fix the issue), and start to think that the problem is in how I submit the Druid task. And here my question for you comes.
The task is submitted through CLI, as specified in
http://druid.io/docs/latest/ingestion/command-line-hadoop-indexer.html. I report below the Spec file content:
{
"type" : "index_hadoop",
"hadoopDependencyCoordinates": [
"org.apache.hadoop:hadoop-client:2.7.3"
],
"spec" : {
"dataSchema" : {
"dataSource" : "<the-data-source>",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"timestampSpec" : {
"column" : "timestamp",
"format" : "iso"
},
"dimensionsSpec" : {
<list-of-dimension-specs>
},
"metricSpec": {
<list-of-metric-specs>
},
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "HOUR",
"queryGranularity" : "HOUR",
"intervals" : [
"2017-08-02T10:00:00.000Z/2017-08-02T10:59:59.000Z"
]
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type": "static",
"paths": <list-of-input-file>
},
"metadataUpdateSpec" : {
"type": "postgresql",
"connectURI": "jdbc:postgresql://<host>:5432/druid",
"user": "<druid-db-user>",
"password": "<secret-password>",
"segmentTable": "druid_segments"
},
"segmentOutputPath" : "<output-location>"
},
"tuningConfig" : {
"type" : "hadoop",
"jobProperties": {
"mapreduce.job.classloader": "true"
},
"maxRowsInMemory": 2500,
"numShards": 5,
"workingPath": "/tmp"
}
}
}
Since the tasks are successful if I reduce the input data by a factor ~5, I tried also the specify to use more shards for tasks execution, but this didn't help (see tuningConfig field of Spec file). I've also tried to specify a reduced maximum number of rows in memory before to start to make write on disk the task output, and
also this didn't help.
I've also tried to execute the task through the middlemanager service instead of to use CLI, with the same results (so the issue should not be related to CLI usage).
I'm convinced that I can properly tune the task through the Spec, tough I tried any reasonable change. Has anybody of you experienced a similar issue executing tasks on a remote Hadoop cluster?
Thank you in advance for any hint.
Giuseppe.