Druid as OutputFormat for Hadoop

163 views
Skip to first unread message

dav...@blismedia.com

unread,
May 20, 2014, 8:35:49 AM5/20/14
to druid-de...@googlegroups.com
Hi All,
I am new to druid and I managed to run the tutorials and get some query performed successfully. I want now try to scale up the example to something closer to a "production" scenario.

I have 2 log file types (for each hour, several files are generated for each of the two types). To build the data I want to ingest in Druid, I wrote a simple join in Hadoop, output the data on disk and then index the file. While this solution worked for a proof-of-concept, this is no where near a scalable solution for the amount of data I have to deal with every hour (roughly 10Gb).

My idea was to push the data directly into Druid from the Hadoop job, instead of using the temporary file. Would that be possible? Is there a way to access the ingestion API from an Hadoop job?

Thanks,
Davide




This email and any attachments to it may be confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily represent those of BlisMedia Ltd, a company registered in England and Wales with registered number 06455773. Its registered office is 3rd Floor, 101 New Cavendish St, London, W1W 6XH, United Kingdom.

If you are not the intended recipient of this email, you must neither take any action based upon its contents, nor copy or show it to anyone. Please contact the sender if you believe you have received this email in error. 

Fangjin Yang

unread,
May 20, 2014, 7:56:17 PM5/20/14
to druid-de...@googlegroups.com
Hi Davide,

Druid does have built in batch Hadoop ingestion. Druid only understands denormalized data, so what we typically do is format data at ETL time to something Druid understands (TSV, CSV, JSON) and ingest using batch Hadoop ingestion.

Let me know if this makes sense.

-- FJ

Nishant Bangarwa

unread,
May 21, 2014, 5:07:36 AM5/21/14
to druid-de...@googlegroups.com
Hi Davide, 

you might also be interested in looking at Tranquility, https://github.com/metamx/tranquility that helps you to send event streams to druid.  


--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/da0b936f-b36d-4347-bff1-a2c0db414f15%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--

dav...@blismedia.com

unread,
Jun 5, 2014, 3:29:57 AM6/5/14
to druid-de...@googlegroups.com
Hi FJ and Nishant,
my apologies for the very late reply!

My first attempt was to denormalize the data using an hadoop job, as suggested by FJ, and then fire off a curl request to an overloard node, as described at the bottom of this page: http://druid.io/docs/0.6.120/Batch-ingestion.html (Batch Ingestion Using the Indexing Service). However, in my particular case, as the data can be quite big (several Gb), I probably need the HadoopDruidIndexer? I am a bit confused about all the possible combinations of ingestion techniques: could you give me a simple brief?

Regarding Tranquillity, is this project specifically target for real-time ingestion? Our approach at the moment is to begin a proof-of-concept using batch ingestion, and eventually add a real-time pipeline in a second stage.

Thanks,
Davide

Gian Merlino

unread,
Jun 5, 2014, 11:50:07 AM6/5/14
to druid-de...@googlegroups.com
The intent of the indexing service for batch-mode is something like,

(1) Load your data into hdfs somehow. This might be done with some external process that simply writes logs to hdfs, or it might be another hadoop job that does something like partitioning, joins, or pre-aggregations.
(2) Send an "index_hadoop" task to the overlord telling it where the data is in hdfs.
(3) The overlord will spawn a hadoop job that indexes your data, writes segments to your deep storage (which could also be hdfs, or could be something else), and arranges for them to be loaded into the cluster for querying.

Tranquility is designed purely for pushing events to druid in real-time, so it might be useful to you in the future, but not for a batch POC.

dav...@blismedia.com

unread,
Jun 6, 2014, 6:07:58 AM6/6/14
to druid-de...@googlegroups.com
Hi,
at the moment I am not using an overlord node and try to get the HadoopDruidIndexer working, using HDFS as deep storage for Druid (ideally this should make very seemless the handover to the cluster after the ingestion).

I am facing two problems:
1. timestamp gets the BST timezone, while the timestamp in the data is UTC (and I would like Druid to consider them as UTC).

At the end of the indexing, I see:

2014-06-06 09:54:25,361 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T10:00:00.000+01:00_2014-04-03T11:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
2014-06-06 09:54:25,364 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T12:00:00.000+01:00_2014-04-03T13:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
2014-06-06 09:54:25,369 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T13:00:00.000+01:00_2014-04-03T14:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments

2. I cannot get the HadoopDruidIndexer to write the segment metadata to the MySQL database

My conf is the following:

   "metadataUpdateSpec":{
      "type":"db",
      "connectURI":"jdbc:mysql:\/\/localhost:3306\/druid",
      "user":"druid",
      "password":"diurd",
      "segmentTable":"druid_segments"
   }

I am currently working on a local test cluster on my workstation: is there any limitation in this?

D.

Nishant Bangarwa

unread,
Jun 6, 2014, 8:28:04 AM6/6/14
to druid-de...@googlegroups.com
Hi Davide, 
See Inline 


On Fri, Jun 6, 2014 at 3:37 PM, <dav...@blismedia.com> wrote:
Hi,
at the moment I am not using an overlord node and try to get the HadoopDruidIndexer working, using HDFS as deep storage for Druid (ideally this should make very seemless the handover to the cluster after the ingestion).

I am facing two problems:
1. timestamp gets the BST timezone, while the timestamp in the data is UTC (and I would like Druid to consider them as UTC).

At the end of the indexing, I see:

2014-06-06 09:54:25,361 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T10:00:00.000+01:00_2014-04-03T11:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
2014-06-06 09:54:25,364 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T12:00:00.000+01:00_2014-04-03T13:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
2014-06-06 09:54:25,369 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T13:00:00.000+01:00_2014-04-03T14:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
you can configure your mappers and reducers to run in UTC timezone, 
In the mapred-site.xml you can configure the parameters passed to the mappers and reducers 
e.g. for hadoop 2.x you can add these - 
  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
  </property>

 

2. I cannot get the HadoopDruidIndexer to write the segment metadata to the MySQL database

My conf is the following:

   "metadataUpdateSpec":{
      "type":"db",
      "connectURI":"jdbc:mysql:\/\/localhost:3306\/druid",
      "user":"druid",
      "password":"diurd",
      "segmentTable":"druid_segments"
   }

I am currently working on a local test cluster on my workstation: is there any limitation in this?
We have seen errors with mismatch in Timezones between hadoop and druid leading to failing tasks, I wonder if this failure is also related, can you try with setting TZ for hadoop to  UTC ? 
If it still fails, check for any exceptions in the task logs. 

For more options, visit https://groups.google.com/d/optout.

Davide Anastasia

unread,
Jun 6, 2014, 8:37:01 AM6/6/14
to druid-de...@googlegroups.com
Hi again,



On 6 June 2014 13:28, Nishant Bangarwa <nishant....@metamarkets.com> wrote:
Hi Davide, 
See Inline 


On Fri, Jun 6, 2014 at 3:37 PM, <dav...@blismedia.com> wrote:
Hi,
at the moment I am not using an overlord node and try to get the HadoopDruidIndexer working, using HDFS as deep storage for Druid (ideally this should make very seemless the handover to the cluster after the ingestion).

I am facing two problems:
1. timestamp gets the BST timezone, while the timestamp in the data is UTC (and I would like Druid to consider them as UTC).

At the end of the indexing, I see:

2014-06-06 09:54:25,361 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T10:00:00.000+01:00_2014-04-03T11:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
2014-06-06 09:54:25,364 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T12:00:00.000+01:00_2014-04-03T13:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
2014-06-06 09:54:25,369 INFO [main] io.druid.indexer.IndexGeneratorJob - Adding segment campaign_report_2014-04-03T13:00:00.000+01:00_2014-04-03T14:00:00.000+01:00_2014-06-06T09:52:49.681+01:00 to the list of published segments
you can configure your mappers and reducers to run in UTC timezone, 
In the mapred-site.xml you can configure the parameters passed to the mappers and reducers 
e.g. for hadoop 2.x you can add these - 
  <property>
    <name>mapreduce.map.java.opts</name>
    <value>-server -Xmx1536m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
  </property>
  <property>
    <name>mapreduce.reduce.java.opts</name>
    <value>-server -Xmx2560m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
  </property>


I'll try this now, but I am using Hadoop 1.2.1 currently on my workstation. Probably upgrading would not be too complicated, but I have already tried in the past and I've been bitten by the change of API, so I reverted back to my current version. 
At some point, my test environment will be moved to AWS: will I have the same problem there?
 
 

2. I cannot get the HadoopDruidIndexer to write the segment metadata to the MySQL database

My conf is the following:

   "metadataUpdateSpec":{
      "type":"db",
      "connectURI":"jdbc:mysql:\/\/localhost:3306\/druid",
      "user":"druid",
      "password":"diurd",
      "segmentTable":"druid_segments"
   }

I am currently working on a local test cluster on my workstation: is there any limitation in this?
We have seen errors with mismatch in Timezones between hadoop and druid leading to failing tasks, I wonder if this failure is also related, can you try with setting TZ for hadoop to  UTC ? 
If it still fails, check for any exceptions in the task logs. 

The error was that I used metadataUpdateSpec, while the key should instead by updaterJobSpec. Once I have switched to the correct key, the indexer did store the segment into MySQL correctly. Unfortunately, I am now running into this problem:


I am using Druid 0.6.73: would upgrade to the latest version help me with this without recompiling?

Cheers,
D.

 

Nishant Bangarwa

unread,
Jun 6, 2014, 8:44:54 AM6/6/14
to druid-de...@googlegroups.com
for hadoop 1.x the property was named as mapred.child.java.opts -
e.g. 
in mapred-site.xml
   <property>
        <name>mapred.child.java.opts</name>
        <value>-Xmx2192m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps</value>
    </property>

Its recommended to use UTC as timezone with druid.


--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Davide Anastasia

unread,
Jun 6, 2014, 10:20:06 AM6/6/14
to druid-de...@googlegroups.com
Hi Again,
I finally decided to recompile druid for Hadoop 1.2.1 (I am still not sure what I am doing wrong).

What I am getting now on my workstation is the following:

2014-06-06 15:15:24,916 ERROR [main] io.druid.cli.CliHadoopIndexer - failure!!!!
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at io.druid.cli.CliHadoopIndexer.run(CliHadoopIndexer.java:113)
at io.druid.cli.Main.main(Main.java:92)
Caused by: java.lang.RuntimeException: java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: "edron/127.0.1.1"; destination host is: "localhost":54310; 
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at io.druid.indexer.DetermineHashedPartitionsJob.run(DetermineHashedPartitionsJob.java:184)
at io.druid.indexer.JobHelper.runJobs(JobHelper.java:134)
at io.druid.indexer.HadoopDruidDetermineConfigurationJob.run(HadoopDruidDetermineConfigurationJob.java:86)
at io.druid.indexer.JobHelper.runJobs(JobHelper.java:134)
at io.druid.cli.CliInternalHadoopIndexer.run(CliInternalHadoopIndexer.java:58)
at io.druid.cli.Main.main(Main.java:92)
... 6 more
Caused by: java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: "edron/127.0.1.1"; destination host is: "localhost":54310; 
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
at org.apache.hadoop.ipc.Client.call(Client.java:1410)
at org.apache.hadoop.ipc.Client.call(Client.java:1359)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy80.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy80.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:671)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1746)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1112)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1108)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1108)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1399)
at io.druid.indexer.JobHelper.setupClasspath(JobHelper.java:78)
at io.druid.indexer.DetermineHashedPartitionsJob.run(DetermineHashedPartitionsJob.java:108)


I am currently running 0.6.105

Thanks for your help so far.
Best,
D.



--
You received this message because you are subscribed to a topic in the Google Groups "Druid Development" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/druid-development/uPxW-U953vI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to druid-developm...@googlegroups.com.

To post to this group, send email to druid-de...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Nishant Bangarwa

unread,
Jun 6, 2014, 10:35:56 AM6/6/14
to druid-de...@googlegroups.com
Hi davide, 

It seems to be caused due to hadoop version mismatch, 
have you added hadoopCoordinates to your task spec ? If not specify hadoopCoordinates to point to "hadoopCoordinates": "org.apache.hadoop:hadoop-client:1.2.1" 
another thing to note while recompiling with hadoop 1.x is that you will need to downgrade the version of com.google.protobuf.protobuf-java to 2.4.0a 



For more options, visit https://groups.google.com/d/optout.

Davide Anastasia

unread,
Jun 6, 2014, 11:12:52 AM6/6/14
to druid-de...@googlegroups.com
I have found the problem: in Hadoop 1.2.1, jar containing the core library is in the HADOOP_HOME folder, not the HADOOP_HOME/lib

Once I have added the right folder to the classpath, the job runs smooth.
Thanks to your UTC fix, I have also ingested the data in the right hourly bucket.

I think I have almost put all the pieces together, but the historical node cannot read the segment, with the following error:

2014-06-06 15:08:18,342 ERROR [ZkCoordinator-0] io.druid.server.coordination.ZkCoordinator - Failed to load segment for dataSource: {class=io.druid.server.coordination.ZkCoordinator, exceptionType=class io.druid.segment.loading.SegmentLoadingException, exceptionMessage=Exception loading segment[campaign_report_2014-04-03T09:00:00.000Z_2014-04-03T10:00:00.000Z_2014-06-06T14:32:52.818Z], segment=DataSegment{size=28492, shardSpec=NoneShardSpec, metrics=[count, bid_count, bid_spend, win_spend, win_count, impression_count, click_count], dimensions=[online_user_cache_lookup_code, location_match_code, width, sspid, venue_owner_id, gender_lookup_code, age_lookup_code, isp_customer_request, campaign_id, country, publisher, banner_id, height, device, is_app, venue_id, matched_location_id, matched_location_group_id], version='2014-06-06T14:32:52.818Z', loadSpec={type=hdfs, path=/druid/segments/campaign_report/20140403T090000.000Z_20140403T100000.000Z/2014-06-06T14_32_52.818Z/0/index.zip}, interval=2014-04-03T09:00:00.000Z/2014-04-03T10:00:00.000Z, dataSource='campaign_report', binaryVersion='9'}}
io.druid.segment.loading.SegmentLoadingException: Exception loading segment[campaign_report_2014-04-03T09:00:00.000Z_2014-04-03T10:00:00.000Z_2014-06-06T14:32:52.818Z]
at io.druid.server.coordination.ZkCoordinator.addSegment(ZkCoordinator.java:129)
at io.druid.server.coordination.SegmentChangeRequestLoad.go(SegmentChangeRequestLoad.java:44)
at io.druid.server.coordination.BaseZkCoordinator$1.childEvent(BaseZkCoordinator.java:113)
at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:494)
at org.apache.curator.framework.recipes.cache.PathChildrenCache$5.apply(PathChildrenCache.java:488)
at org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:92)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:485)
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
at org.apache.curator.framework.recipes.cache.PathChildrenCache$11.run(PathChildrenCache.java:755)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.druid.segment.loading.SegmentLoadingException: Path[/druid/segments/campaign_report/20140403T090000.000Z_20140403T100000.000Z/2014-06-06T14_32_52.818Z/0/index.zip] doesn't exist.
at io.druid.storage.hdfs.HdfsDataSegmentPuller.checkPathAndGetFilesystem(HdfsDataSegmentPuller.java:99)
at io.druid.storage.hdfs.HdfsDataSegmentPuller.getSegmentFiles(HdfsDataSegmentPuller.java:53)
at io.druid.segment.loading.OmniSegmentLoader.getSegmentFiles(OmniSegmentLoader.java:125)
at io.druid.segment.loading.OmniSegmentLoader.getSegment(OmniSegmentLoader.java:93)
at io.druid.server.coordination.ServerManager.loadSegment(ServerManager.java:144)
at io.druid.server.coordination.ZkCoordinator.addSegment(ZkCoordinator.java:125)
... 17 more

Thanks,
D.



For more options, visit https://groups.google.com/d/optout.

Davide Anastasia

unread,
Jun 6, 2014, 11:48:19 AM6/6/14
to druid-de...@googlegroups.com
Found the solution for this problem as well: as for the HadoopDruidIndexer, the historical node wants in the classpath the path of the jar and conf folders in hadoop,

I have my cluster up and running and my ingestion working :)
Success!

D.

Nishant Bangarwa

unread,
Jun 6, 2014, 11:57:01 AM6/6/14
to druid-de...@googlegroups.com
great to hear that its working :) 
yeah you ll need to add hadoop jars and config to historicals when using HDFS as deep storage so that they can pull segments from there. 



Reply all
Reply to author
Forward
0 new messages