We have been using Google Cloud Storage (GCS) and Dataproc for more than a year now. This was exactly the reason why I wrote the GCS support.
We have all of our raw CSV files and segments in GCS buckets. Once a day we programmatically create a new Dataproc cluster and index all CSV files from yesterday. After which we kill the Dataproc cluster again. This way we only have to pay for the actual minutes we use the cluster.
We always keep druid.storage.type set to google.
When building our Druid version we copied the Hadoop config from a Dataproc cluster to the Druid config (/etc/hadoop/conf/* from the dataproc master to druid/config/hadoop/) and modified this to point to the hostname we always use for our Dataproc cluster (mapreduce.jobhistory.address) (I guess this can also be set as arguments to the hadoop job you create).
To fix compatibility issues we actually have Dataproc use the Jackson libs from Druid. We use a script similar to this: https://gist.github.com/erikdubbelboer/196f28e274ed7363858d8e6b8d4a5356
So we have a Dataproc initialization action that just replaces the Dataproc Jackson libs with the ones from when we build our Druid version.
In the Druid Hadoop job we submit we use: mapreduce.job.user.classpath.first = true
And hadoopDependencyCoordinates to ["org.apache.hadoop:hadoop-client:2.7.3"]
Since all our raw CSV files are also stored in GCS We use the following ioConfig:
"ioConfig" : {
"type": "hadoop",
"inputSpec": {
"type": "static",
"paths": "gs://another-bucket/foo.gz,gs://another-bucket/bar.gz"
}
Which has been possible since https://github.com/druid-io/druid/pull/2645
Let me know if you need to know more.
2017-07-03T15:17:20,049 WARN [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobResourceUploader - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 2017-07-03T15:17:20,061 WARN [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobResourceUploader - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 2017-07-03T15:17:20,946 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2017-07-03T15:17:21,035 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobSubmitter - number of splits:1 2017-07-03T15:17:21,218 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_1498643757933_0004 2017-07-03T15:17:21,410 INFO [task-runner-0-priority-0] org.apache.hadoop.mapred.YARNRunner - Job jar is not present. Not adding any jar to the list of resources. 2017-07-03T15:17:22,229 INFO [task-runner-0-priority-0] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1498643757933_0004 2017-07-03T15:17:22,286 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - The url to track the job: http://test-hadoop-batch-ingestion-m:8088/proxy/application_1498643757933_0004/ 2017-07-03T15:17:22,286 INFO [task-runner-0-priority-0] io.druid.indexer.IndexGeneratorJob - Job 666-bi-index-generator-Optional.of([2017-06-27T04:00:00.000Z/2017-06-27T05:00:00.000Z]) submitted, status available at http://test-hadoop-batch-ingestion-m:8088/proxy/application_1498643757933_0004/ 2017-07-03T15:17:22,287 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Running job: job_1498643757933_0004 2017-07-03T15:17:28,368 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1498643757933_0004 running in uber mode : false 2017-07-03T15:17:28,370 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 0% reduce 0% 2017-07-03T15:17:33,830 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1498643757933_0004_m_000000_0, Status : FAILED Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType; 2017-07-03T15:17:39,885 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1498643757933_0004_m_000000_1, Status : FAILED Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType; 2017-07-03T15:17:45,919 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1498643757933_0004_m_000000_2, Status : FAILED Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType; 2017-07-03T15:17:52,959 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 100% reduce 100% 2017-07-03T15:17:52,967 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1498643757933_0004 failed with state FAILED due to: Task failed task_1498643757933_0004_m_000000 Job failed as tasks failed. failedMaps:1 failedReduces:0 2017-07-03T15:17:53,056 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Counters: 16 Job Counters Failed map tasks=4 Killed reduce tasks=1 Launched map tasks=4 Other local map tasks=3 Rack-local map tasks=1 Total time spent by all maps in occupied slots (ms)=49719 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=16573 Total time spent by all reduce tasks (ms)=0 Total vcore-milliseconds taken by all map tasks=16573 Total vcore-milliseconds taken by all reduce tasks=0 Total megabyte-milliseconds taken by all map tasks=50912256 Total megabyte-milliseconds taken by all reduce tasks=0 Map-Reduce Framework CPU time spent (ms)=0 Physical memory (bytes) snapshot=0 Virtual memory (bytes) snapshot=0 2017-07-03T15:17:53,065 INFO [task-runner-0-priority-0] io.druid.indexer.JobHelper - Deleting path[/tmp/druid-indexing/666-bi/2017-07-03T151702.509Z_f4cf5d967db44766a17f3908fac3ce60] 2017-07-03T15:17:53,098 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_666-bi_2017-07-03T15:16:12.342Z, type=index_hadoop, dataSource=666-bi}] java.lang.RuntimeException: java.lang.reflect.InvocationTargetException at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?] at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:211) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:223) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131] at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] ... 7 more Caused by: io.druid.java.util.common.ISE: Job[class io.druid.indexer.IndexGeneratorJob] failed! at io.druid.indexer.JobHelper.runJobs(JobHelper.java:370) ~[druid-indexing-hadoop-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:95) ~[druid-indexing-hadoop-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:276) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131] at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT] ... 7 more 2017-07-03T15:17:53,110 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_hadoop_666-bi_2017-07-03T15:16:12.342Z] status changed to [FAILED]. 2017-07-03T15:17:53,113 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: { "id" : "index_hadoop_666-bi_2017-07-03T15:16:12.342Z", "status" : "FAILED", "duration" : 41767 }
Do you have any idea about why the container task is killed, basing on your experience with Dataproc?
Thanks and regards,
Giuseppe.
In the Druid Hadoop job we submit we use: mapreduce.job.user.classpath.first = true
Great that you got it working.
We do:
"tuningConfig" : {
"type" : "hadoop",
"jobProperties": {
"mapreduce.job.user.classpath.first": "true"
}
And we leave mapreduce.job.classloader as the default. I also tried what you are doing but somehow never got that working. We use the default hadoop version in pom.xml so that might have something to do with it.