Running Druid batch ingestion tasks on a remote Google Dataproc cluster

990 views
Skip to first unread message

g.broc...@gmail.com

unread,
Jun 30, 2017, 11:46:29 AM6/30/17
to Druid User
Hi all,

I'm actually have a Druid cluster (Overlord, MiddleManagers, Coordinator and Brokers) that successfully performs a local batch ingestion of files located in a Google Storage bucket, and place the produced index segments in another Google Storage bucket. I'd like now to implement a new solution based on remote execution in an Hadoop cluster provided by Google Dataproc (https://cloud.google.com/dataproc/docs/). The Hadoop version present in Dataproc nodes is 2.3.0, the Druid version is 0.9.2.

Currently, batch ingestion of files is provided through the "druid-google-extensions" extension, so the produced segments are placed in a Google bucket as specified in the common.runtime.properties common configuration:

# Deep storage
# Cloudfiles storage configuration
druid.storage.type=google
druid.google.bucket=the-bucket
druid.google.prefix=the-path

and the json file for input batch ingestion is configured like:

"ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "ingestSegment",
        "paths" : "gs://another-bucket/file.gz"
      }

Now, I'd like to obtain the same result, executing tasks on a remote Hadoop cluster. The idea is to use Dataproc Google clusters because is relatively easy (just a command) to create an Hadoop cluster already configured and ready to execute Hadoop jobs, and (of course) because our data is based on Google Storage.

I've already studied some strategies to implement remote execution of batch ingestions with Hadoop, and what I understand is to follow specifications in http://druid.io/docs/latest/ingestion/batch-ingestion.html, in particular in the section "Remote Hadoop Cluster". So the idea is to copy all xml configuration of the Hadoop Dataproc cluster in _common/, and to properly adapt the json file for batch ingestion.

What I'd like to ask now to anyone has experience with the integration of Druid with remote Google Dataproc clusters is: druid.storage.type has to be configured to 'hdfs', and the Google bucket where to place the index segments has to be visible from the Hadoop FS, but can the Google firehose be continued to be used in ioConfig? Which is the alternative, to locally import the file in the Druid cluster, or use a single bucket as HDFS deep storage?

Hope to find someone was already able to implement this :)

Regards,
Giuseppe.

Erik Dubbelboer

unread,
Jul 1, 2017, 12:20:57 AM7/1/17
to Druid User
We have been using Google Cloud Storage (GCS) and Dataproc for more than a year now. This was exactly the reason why I wrote the GCS support.

We have all of our raw CSV files and segments in GCS buckets. Once a day we programmatically create a new Dataproc cluster and index all CSV files from yesterday. After which we kill the Dataproc cluster again. This way we only have to pay for the actual minutes we use the cluster.

We always keep druid.storage.type set to google.

When building our Druid version we copied the Hadoop config from a Dataproc cluster to the Druid config (/etc/hadoop/conf/* from the dataproc master to druid/config/hadoop/) and modified this to point to the hostname we always use for our Dataproc cluster (mapreduce.jobhistory.address) (I guess this can also be set as arguments to the hadoop job you create).

To fix compatibility issues we actually have Dataproc use the Jackson libs from Druid. We use a script similar to this: https://gist.github.com/erikdubbelboer/196f28e274ed7363858d8e6b8d4a5356
So we have a Dataproc initialization action that just replaces the Dataproc Jackson libs with the ones from when we build our Druid version.

In the Druid Hadoop job we submit we use: mapreduce.job.user.classpath.first = true
And hadoopDependencyCoordinates to ["org.apache.hadoop:hadoop-client:2.7.3"]

Since all our raw CSV files are also stored in GCS We use the following ioConfig:

"ioConfig" : {
  "type": "hadoop",
  "inputSpec": {
    "type": "static",
    "paths": "gs://another-bucket/foo.gz,
gs://another-bucket/bar.gz"
  }


Which has been possible since https://github.com/druid-io/druid/pull/2645

Let me know if you need to know more.

Cheers,
Erik

Giuseppe Broccolo

unread,
Jul 3, 2017, 12:09:11 PM7/3/17
to druid...@googlegroups.com
Hi Erik,

First of all: thanks for your reply, really appreciated! :)

2017-07-01 6:20 GMT+02:00 Erik Dubbelboer <er...@dubbelboer.com>:
We have been using Google Cloud Storage (GCS) and Dataproc for more than a year now. This was exactly the reason why I wrote the GCS support.

We have all of our raw CSV files and segments in GCS buckets. Once a day we programmatically create a new Dataproc cluster and index all CSV files from yesterday. After which we kill the Dataproc cluster again. This way we only have to pay for the actual minutes we use the cluster.

This is indeed what I'd like to achieve! :)
 
We always keep druid.storage.type set to google.

When building our Druid version we copied the Hadoop config from a Dataproc cluster to the Druid config (/etc/hadoop/conf/* from the dataproc master to druid/config/hadoop/) and modified this to point to the hostname we always use for our Dataproc cluster (mapreduce.jobhistory.address) (I guess this can also be set as arguments to the hadoop job you create).

To fix compatibility issues we actually have Dataproc use the Jackson libs from Druid. We use a script similar to this: https://gist.github.com/erikdubbelboer/196f28e274ed7363858d8e6b8d4a5356
So we have a Dataproc initialization action that just replaces the Dataproc Jackson libs with the ones from when we build our Druid version.

In the Druid Hadoop job we submit we use: mapreduce.job.user.classpath.first = true
And hadoopDependencyCoordinates to ["org.apache.hadoop:hadoop-client:2.7.3"]

Since all our raw CSV files are also stored in GCS We use the following ioConfig:

"ioConfig" : {
  "type": "hadoop",
  "inputSpec": {
    "type": "static",
    "paths": "gs://another-bucket/foo.gz,
gs://another-bucket/bar.gz"
  }


Which has been possible since https://github.com/druid-io/druid/pull/2645

Let me know if you need to know more.

I'm glad to hear I was following the right strategy, tough tasks fail. Just let me briefly summarize what I did to setup remote execution of batch tasks on Dataproc cluster:

* Since Dataproc is based on Hadoop 2.7.3, I've made the following change in the src/pom.xml:

           <hadoop.compile.version>2.3.0</hadoop.compile.version> --> <hadoop.compile.version>2.7.3</hadoop.compile.version>

  and then I've rebuilt Druid, and pulled the dependences (see druid.extensions.loadList below)

* The common.runtime.properties configuration of the overlord contains the following configuration:
 
           druid.extensions.loadList=["druid-google-extensions","druid-kafka-eight","postgresql-metadata-storage"]
           druid.storage.type=google
           druid.google.bucket=a-bucket
           druid.google.prefix=any-path
           mapreduce.job.user.classpath.first = true

* I've copied the Hadoop xml configuration present in the Dataproc master node (/etc/hadoop/conf/*) in the druid/conf/hadoop directory in the Overlord node

* I've then replaced the jackson jar's present in all nodes of the Hadoop cluster (in /usr/lib/hadoop-mapreduce) with the one from Druid build, present in the lib/ directory in the Overlord node

* I've added the gce-connector in the lib/ directory of the Overlord node (https://cloud.google.com/dataproc/docs/connectors/cloud-storage) - I wasn't able to read the input log files from the input bucket otherwise

* the json configuration of the task is properly configured to access to the input bucket specifying the gs:// in the path

I'm able to submit the task, but I obtain the following error during mapping phase of the task:

2017-07-03T15:17:20,049 WARN [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobResourceUploader - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2017-07-03T15:17:20,061 WARN [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobResourceUploader - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2017-07-03T15:17:20,946 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2017-07-03T15:17:21,035 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobSubmitter - number of splits:1
2017-07-03T15:17:21,218 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_1498643757933_0004
2017-07-03T15:17:21,410 INFO [task-runner-0-priority-0] org.apache.hadoop.mapred.YARNRunner - Job jar is not present. Not adding any jar to the list of resources.
2017-07-03T15:17:22,229 INFO [task-runner-0-priority-0] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1498643757933_0004
2017-07-03T15:17:22,286 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - The url to track the job: http://test-hadoop-batch-ingestion-m:8088/proxy/application_1498643757933_0004/
2017-07-03T15:17:22,286 INFO [task-runner-0-priority-0] io.druid.indexer.IndexGeneratorJob - Job 666-bi-index-generator-Optional.of([2017-06-27T04:00:00.000Z/2017-06-27T05:00:00.000Z]) submitted, status available at http://test-hadoop-batch-ingestion-m:8088/proxy/application_1498643757933_0004/
2017-07-03T15:17:22,287 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Running job: job_1498643757933_0004
2017-07-03T15:17:28,368 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1498643757933_0004 running in uber mode : false
2017-07-03T15:17:28,370 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job -  map 0% reduce 0%
2017-07-03T15:17:33,830 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1498643757933_0004_m_000000_0, Status : FAILED
Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;
2017-07-03T15:17:39,885 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1498643757933_0004_m_000000_1, Status : FAILED
Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;
2017-07-03T15:17:45,919 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1498643757933_0004_m_000000_2, Status : FAILED
Error: com.google.inject.util.Types.collectionOf(Ljava/lang/reflect/Type;)Ljava/lang/reflect/ParameterizedType;
2017-07-03T15:17:52,959 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job -  map 100% reduce 100%
2017-07-03T15:17:52,967 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1498643757933_0004 failed with state FAILED due to: Task failed task_1498643757933_0004_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

2017-07-03T15:17:53,056 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Counters: 16
	Job Counters 
		Failed map tasks=4
		Killed reduce tasks=1
		Launched map tasks=4
		Other local map tasks=3
		Rack-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=49719
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=16573
		Total time spent by all reduce tasks (ms)=0
		Total vcore-milliseconds taken by all map tasks=16573
		Total vcore-milliseconds taken by all reduce tasks=0
		Total megabyte-milliseconds taken by all map tasks=50912256
		Total megabyte-milliseconds taken by all reduce tasks=0
	Map-Reduce Framework
		CPU time spent (ms)=0
		Physical memory (bytes) snapshot=0
		Virtual memory (bytes) snapshot=0
2017-07-03T15:17:53,065 INFO [task-runner-0-priority-0] io.druid.indexer.JobHelper - Deleting path[/tmp/druid-indexing/666-bi/2017-07-03T151702.509Z_f4cf5d967db44766a17f3908fac3ce60]
2017-07-03T15:17:53,098 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[HadoopIndexTask{id=index_hadoop_666-bi_2017-07-03T15:16:12.342Z, type=index_hadoop, dataSource=666-bi}]
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
	at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:211) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:223) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	... 7 more
Caused by: io.druid.java.util.common.ISE: Job[class io.druid.indexer.IndexGeneratorJob] failed!
	at io.druid.indexer.JobHelper.runJobs(JobHelper.java:370) ~[druid-indexing-hadoop-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:95) ~[druid-indexing-hadoop-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:276) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_131]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_131]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_131]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_131]
	at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
	... 7 more
2017-07-03T15:17:53,110 INFO [task-runner-0-priority-0] io.druid.indexing.overlord.TaskRunnerUtils - Task [index_hadoop_666-bi_2017-07-03T15:16:12.342Z] status changed to [FAILED].
2017-07-03T15:17:53,113 INFO [task-runner-0-priority-0] io.druid.indexing.worker.executor.ExecutorLifecycle - Task completed with status: {
  "id" : "index_hadoop_666-bi_2017-07-03T15:16:12.342Z",
  "status" : "FAILED",
  "duration" : 41767
}
Do you have any idea about why the container task is killed, basing on your experience with Dataproc?

Thanks and regards,
Giuseppe.

Giuseppe Broccolo

unread,
Jul 4, 2017, 10:32:13 AM7/4/17
to druid...@googlegroups.com
Hi all,

I have finally been able to fix the issue, and to properly submit the job to the Dataproc cluster!

The procedure reported in my previous message works, but

2017-07-01 6:20 GMT+02:00 Erik Dubbelboer <er...@dubbelboer.com>:
In the Druid Hadoop job we submit we use: mapreduce.job.user.classpath.first = true

I was adding this configuration just in the overlord one. To make it work and avoid the error 143 I had
to add this in the json configuration of the task too:

  "tuningConfig" : {
      "type" : "hadoop",
      "jobProperties": {
          "mapreduce.job.classloader": "true"
   }

Thanks for your help again, Erik!!

All the best,
Giuseppe.

Erik Dubbelboer

unread,
Jul 6, 2017, 9:58:44 AM7/6/17
to Druid User
Great that you got it working.

We do:
 
   "tuningConfig" : {
      "type" : "hadoop",
      "jobProperties": {
          "mapreduce.job.user.classpath.first": "true"
      }

And we leave mapreduce.job.classloader as the default. I also tried what you are doing but somehow never got that working. We use the default hadoop version in pom.xml so that might have something to do with it.


Giuseppe Broccolo

unread,
Jul 7, 2017, 10:35:37 AM7/7/17
to druid...@googlegroups.com
Hi Erik,

2017-07-06 15:58 GMT+02:00 Erik Dubbelboer <er...@dubbelboer.com>:

Great that you got it working.

We do:
 
   "tuningConfig" : {
      "type" : "hadoop",
      "jobProperties": {
          "mapreduce.job.user.classpath.first": "true"
      }

And we leave mapreduce.job.classloader as the default. I also tried what you are doing but somehow never got that working. We use the default hadoop version in pom.xml so that might have something to do with it.

Yeah, the hadoop version change in the pom.xml:

       <hadoop.compile.version>2.7.3</hadoop.compile.version>

was necessary to make it work.

Many thanks again for your suggestions,
Giuseppe.
Reply all
Reply to author
Forward
0 new messages