Queries reg. Druid indexer

183 views
Skip to first unread message

Geeta Iyer

unread,
Apr 18, 2014, 5:27:17 AM4/18/14
to druid-de...@googlegroups.com
Hi, 

I am new to druid and have been exploring the indexing service on some sample data and I have few doubts: I have sample data corresponding to 2 hours (00 and 01). The size of input data is around 1.5 GB per hour. The granularity spec specified in my configuration is hour. I tried running the hadoop index task and the local index task on this data. My hadoop cluster consists of 1NN, 3DN, 1 Resource Manager, 1 Job History Server. Each node has 12 GB RAM. I am running the indexing service in local mode.

1. When I run the hadoop index task on 1 hr's worth of data, it takes around 20-25 mins to complete. On the other hand, when I run the local index task, then it takes around 10 mins to complete. 
2. When I run the hadoop index task on 2 hours worth of data, it takes around 1 hr to complete. On the other hand, when I run the local index task, then it takes around 20 mins to complete.

Q. Why is there such a big difference between the execution of local index task and the hadoop index task? How does the hadoop index and local index compare in terms of performance? Under what scenarios, I should be using the hadoop index task? Given the same data, is the hadoop index task supposed to run faster than local index task?

3. The above runs were done with no PartitionSpec specified. Later, I modified the hadoop index task to consider hash based partitioning. In that case, the determine_hashed_partitioning job succeeds. However, the index_generator job fails with the following exception:

Caused by: com.metamx.common.ISE: WTF?! No bucket found for row: MapBasedInputRow{…}

Q. Is there something else that needs to specified for considering hash partitioning? When should we be opting for hash partitioning and what advantages can we expect?

Q. I also observed that when I was running the hadoop index task, last two reducers used to take a long time to finish. This increases the job execution time significantly. What can be the reason for such a behavior?





Nishant Bangarwa

unread,
Apr 20, 2014, 2:20:25 PM4/20/14
to druid-de...@googlegroups.com
Hi Geeta, 
See Inline


On Fri, Apr 18, 2014 at 2:57 PM, Geeta Iyer <iyg...@gmail.com> wrote:
Hi, 

I am new to druid and have been exploring the indexing service on some sample data and I have few doubts: I have sample data corresponding to 2 hours (00 and 01). The size of input data is around 1.5 GB per hour. The granularity spec specified in my configuration is hour. I tried running the hadoop index task and the local index task on this data. My hadoop cluster consists of 1NN, 3DN, 1 Resource Manager, 1 Job History Server. Each node has 12 GB RAM. I am running the indexing service in local mode.

1. When I run the hadoop index task on 1 hr's worth of data, it takes around 20-25 mins to complete. On the other hand, when I run the local index task, then it takes around 10 mins to complete.  
2. When I run the hadoop index task on 2 hours worth of data, it takes around 1 hr to complete. On the other hand, when I run the local index task, then it takes around 20 mins to complete.

Q. Why is there such a big difference between the execution of local index task and the hadoop index task? How does the hadoop index and local index compare in terms of performance? Under what scenarios, I should be using the hadoop index task? Given the same data, is the hadoop index task supposed to run faster than local index task?
The main difference between hadoop_index_task and the local one is scalability, local task generally works upto 1-2 G of data, however hadoop_index_task can scale to much higher extent, so it is recommended to use the hadoop_index_task in production where the data is expected to scale over time. 
Regarding the performance of hadoop_index_task should not be much slower than local one, Can you share some more details about your hadoop task ? 
like the task logs, how was the input file given, was it single compressed( in which format ?) or non compressed file ? How many mappers and reducers were created ?  

3. The above runs were done with no PartitionSpec specified. Later, I modified the hadoop index task to consider hash based partitioning. In that case, the determine_hashed_partitioning job succeeds. However, the index_generator job fails with the following exception: 

Caused by: com.metamx.common.ISE: WTF?! No bucket found for row: MapBasedInputRow{…}
This was a known issue with hashed partitions and has been fixed, can you try with the latest version of druid ?   


Q. Is there something else that needs to specified for considering hash partitioning? When should we be opting for hash partitioning and what advantages can we expect?
The main advantages of hashed based partitioning is that its faster and even distribution of data in segments than single dimension based partitions.  

Q. I also observed that when I was running the hadoop index task, last two reducers used to take a long time to finish. This increases the job execution time significantly. What can be the reason for such a behavior?
 do you mean the the reducer for Index_generator is taking a long time ? 
 can you share logs for those reducers ? 





--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/c03c5743-34b2-4e65-8ee8-5b9aac00fdc7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--

Geeta Iyer

unread,
Apr 21, 2014, 3:53:51 AM4/21/14
to druid-de...@googlegroups.com
Hi Nishant,

Thanks for your reply:

I took the latest code and tried running my job again. I am yet to try out the hash partitioning. 
Currently I am using druid-services-0.6.91-SNAPSHOT.

My input files are simple text files. THey are not compressed. The records are in json format. Example record:

{"GEO_REGION":"00","CURRENT_PAGE_URL":"page_url","REFERRING_URL":"NULL","FA_LAST_INPUT":"NULL","DEVICE_ID":"generic web browser","COUNTRY_OF_PAGE":"glb","ACCOUNT_TYPE":"NULL","ACCOUNT_COUNTRY":"NULL","timestamp":"2014-03-15T00:00:00Z","REFERRER_NAME":"NULL","LINK_URL":"NULL","MOBILE_DEVICE_NAME":"NULL","GEO_STATE":"NULL","PAGE_NAME":"page","TEMPLATE_NAME":"NULL","GEO_COUNTRY":"PR","PAGE_ERROR_STRING":"NULL","INTERNAL_SEARCH_TERM":"NULL","BUSINESS_CHANNEL_NAME":"ec","PAGE_LINK_NAME":"NULL","SERVER_BUSINESSNAME":"main","REFERRER_SEARCH_KEYWORD":"NULL","REFERRER_TYPE":"NULL","PAGE_GROUP":"page_grp","ACCOUNT_VERIFIED":"NULL","PAGEGROUP_LINK_NAME":"NULL","SCREEN_WIDTH":"1920","LINK_NAME":"NULL","DEVICE_TYPE":"COMPUTER","SCREEN_HEIGHT":"1200","CLIENT_OS":"MAC", "count": 1 }


For 1 hr's worth of data with no partitioning, the index_generator job launched 150 mappers and 24 reducers. 
This time the job took around 31-32 mins to complete. Again one of reducers took around 27 mins to complete. Other reducers completed in few seconds or 5-6 mins.
I am attaching the task log for the reducer that took more time to execute.
reducer.log

Deepak Jain

unread,
Apr 21, 2014, 4:53:49 AM4/21/14
to druid-de...@googlegroups.com
Hello Geeta,
Can you share sample raw data, spec file, commands to start HadoopDruidIndexer .
I am starting with it and have setup a single node hadoop cluster (Psuedo distributed mode).
Regards,
Deepak

Nishant Bangarwa

unread,
Apr 21, 2014, 5:10:34 AM4/21/14
to druid-de...@googlegroups.com
Hi Geeta, 

I noticed from the job logs that you ran the job with targetPartitionsSize = -1,
Interval for the job is 1 day and the segments being created are for hourly granularity, 

Druid creates 1 reducer per segment to be created i.e 24 reducers for each hour in your case, 
in this case each of the 24 reducers is creating segment for 1 particular hour, now since the inputData is not evenly distributed across 24 hours, one of the reducers gets most of the data to process, In general it is recommended to run the job with a higher data granularity e.g day and use a 5000000 targetPartitionSize with hashed partitionsSpec. In this case druid will auto shard the segments and evenly distribute the data across the reducers.  

Increasing the amount of heap given to the reducers and the rowFlushBoundary in the rollupSpec will help also help in increasing the performance as well. 
  



For more options, visit https://groups.google.com/d/optout.

Geeta Iyer

unread,
Apr 21, 2014, 7:13:04 AM4/21/14
to druid-de...@googlegroups.com
Hi Nishant,

I did some changes to consider hash partitioning. Basically, I added the following to my config:

 "partitionsSpec": {
      "type": "hashed",
      "targetPartitionSize": 500000
    },

I also changed the granularity spec to specify only one hr, sine I have only one hr's worth of data.

 "granularitySpec" : {
      "type" : "uniform",
      "gran" : "HOUR",
      "intervals" : [ "2014-03-15T00:00:00Z/2014-03-15T01:00:00Z" ]
    },


Now the determine_partitions_hashed job is launched with one reducer. What is the criteria for deciding the number of reducers for hashed based partitioning? Is there a way to increase this number of reducers as well, and will it have any benefit?

Even with the latest code, the index_generator job is failing with the following error:

Caused by: com.metamx.common.ISE: WTF?! No bucket found for row: MapBasedInputRow{timestamp=2014-03-15T00:00:00.000Z, event={geo_region=00, current_page_url=http://realstreetperformance.com/Products/Supertech/?sort1desc=F&range=541,570,622&sort1=Item_NAME, referring_url=NULL, fa_last_input=NULL, device_id=generic web browser, country_of_page=NULL, account_type=NULL, account_country=NULL, timestamp=2014-03-15T00:00:00Z, referrer_name=NULL, link_url=NULL, mobile_device_name=NULL, geo_state=NULL, page_name=ppmn:cookie:::sync, template_name=NULL, geo_country=AW, page_error_string=NULL, internal_search_term=NULL, business_channel_name=NULL, page_link_name=NULL, server_businessname=NULL, referrer_search_keyword=NULL, referrer_type=NULL, page_group=ppmn:cookie:::sync, account_verified=NULL, pagegroup_link_name=NULL, screen_width=NULL, link_name=NULL, device_type=COMPUTER, screen_height=NULL, client_os=WI7, count=1}, dimensions=[PAGE_GROUP, PAGE_NAME, PAGEGROUP_LINK_NAME, PAGE_LINK_NAME, LINK_URL, LINK_NAME, GEO_REGION, GEO_COUNTRY, GEO_STATE, TEMPLATE_NAME, INTERNAL_SEARCH_TERM, BUSINESS_CHANNEL_NAME, DEVICE_ID, DEVICE_TYPE, MOBILE_DEVICE_NAME, SCREEN_HEIGHT, SCREEN_WIDTH, CLIENT_OS, REFERRER_SEARCH_KEYWORD, REFERRER_NAME, REFERRING_URL, REFERRER_TYPE, ACCOUNT_TYPE, ACCOUNT_VERIFIED, ACCOUNT_COUNTRY, PAGE_ERROR_STRING, FA_LAST_INPUT, COUNTRY_OF_PAGE, SERVER_BUSINESSNAME, CURRENT_PAGE_URL]}
        at io.druid.indexer.IndexGeneratorJob$IndexGeneratorMapper.innerMap(IndexGeneratorJob.java:200)
        at io.druid.indexer.HadoopDruidIndexerMapper.map(HadoopDruidIndexerMapper.java:77)
        ... 9 more

Can you please help with this?

Nishant Bangarwa

unread,
Apr 21, 2014, 7:46:30 AM4/21/14
to druid-de...@googlegroups.com
Hi Geeta, 

Druid creates one reducer per shard, and the number of shards is determined based on the targetPartitionSize by dividing totalNumber of rows after rollup by the targetPartitionSize, reducing the targetPartitionSize should increase the no. of shards as well as no of reducers. 

Regarding this exception, can you share complete task logs as well ? 



For more options, visit https://groups.google.com/d/optout.

Geeta Iyer

unread,
Apr 22, 2014, 4:59:41 AM4/22/14
to druid-de...@googlegroups.com
Hi Nishant,

Please find the task logs attached for the index generator job..

Reg. my question on the number of reducers, I was referring to the number of reducers for the determine_partitions_hashed job. This job is getting launched with one reducer. So I was wondering, is there a way to increase the number of reducers for this as well?
mapper.log

Nishant Bangarwa

unread,
Apr 22, 2014, 6:54:09 AM4/22/14
to druid-de...@googlegroups.com
Hi Geeta, 
in the mapper logs the shardSpecs seem to be empty, determine partitions job is responsible for setting these, I wonder if there were any other errors that you got in the task logs ? , can you share the druid hadoop index task logs as well ? 

The determine partitions job in hashed partitions works by estimating the cardinality of each segment granular interval, each reducer can estimate the cardinality of 1 interval, in your case if you are running for 1 hour of data with hourly granularity it will create 1 reducer, If you try indexing data for 1 day with hourly granularity, it should use multiple reducers. 



For more options, visit https://groups.google.com/d/optout.

Geeta Iyer

unread,
Apr 22, 2014, 7:50:03 AM4/22/14
to druid-de...@googlegroups.com
Please find the hadoop index task logs attached. When you mean an exception, are you referring to the determine partitions job? This job seems to complete successfully, although in the job summary, the Reduce output records seem to be 0. Is this expected?



--

         
         
         
         
         
         
         
 
Nishant
Software Engineer|METAMARKETS
<table cellspacing="0" cellpadding="0" border="0" style="border-collapse:collapse;font-size:11px;font-family:Helvetica,Ari
...
index_hadoop_fptidemo_2014-04-22T08:33:15.314Z.log

Nishant Bangarwa

unread,
Apr 22, 2014, 9:11:47 AM4/22/14
to druid-de...@googlegroups.com
Hi Geeta, 
the record output counter is fine. 
the reducer writes the cardinalities for each interval in a file and the index tasks read data from that file, 
In the task logs there is an info log for the task not able to find that file - 
2014-04-22 08:43:00,100 INFO [task-runner-0] io.druid.indexer.DetermineHashedPartitionsJob - Path[/tmp/druid-indexing/fptidemo/2014-04-22T083315.314Z/20140315T000000.000Z_20140315T010000.000Z/partitions.json] didn't exist!?

Is it failing every time you run the job ? 
have you have enabled compression for map-reduce output in your hadoop config files,  
If yes can you try disabling compression for mapreduce job and see if that helps ? 





For more options, visit https://groups.google.com/d/optout.

Geeta Iyer

unread,
Apr 23, 2014, 7:49:15 AM4/23/14
to druid-de...@googlegroups.com
Hi Nishant,

Disabling the compression actually got the job to execute successfully. Thank you very much for your help.

Nishant Bangarwa

unread,
Apr 23, 2014, 8:10:06 AM4/23/14
to druid-de...@googlegroups.com
Thanks for confirming this, 
There was an issue with file extensions not being checked properly in case of compression, 
we have fixed this and it will be available in next release.



For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages