How to trace a request for a segment loaded via the Indexing Service

428 views
Skip to first unread message

Wayne Adams

unread,
Jan 8, 2014, 8:18:35 PM1/8/14
to druid-de...@googlegroups.com
I'm using 0.6.39.  I have recently loaded data successfully using the old HadoopDruidIndexerMain on 0.5.48, so I have seen data ingested before.  Just not yet on 0.6 using the Indexing Service, which is what I'm using today.

I submit my task to the overlord, it completes successfully, the segment gets created, and an entry appears in the druid_segments table.  I've verified that the segment actually exists on disk (corresponding to the value in the druid_segments "payload" field).  All queries I run against Druid return empty arrays, including the time-boundary query.  Queries return empty arrays both from the broker node and from the historical node.

What steps do you recommend I take to determine what is wrong?  Normally I would supply chunks of log file, MySQL entries, etc. but everything looks exactly as it did when I successfully loaded on 0.5 with the old-style indexer, meaning that there's nothing obviously wrong with any of them.  But I was hoping to get some advice on where to tweak logging to try to trace a query, for example from the compute node, ultimately to the local file location of the segment.  Thanks in advance --

Regards, Wayne

Fangjin Yang

unread,
Jan 9, 2014, 12:55:55 PM1/9/14
to druid-de...@googlegroups.com
Hi Wayne,

Just to clarify, when you say the segment exists on disk, you mean that it exists on the disk of the historical (formerly called the compute) node?

There are a few things you can do to make sure that Druid knows about the segment.

First, check the Coordinator (formerly called the master) console located at <COORDINATOR_IP>:<PORT>/cluster.html

This will give you a view as to what historical nodes have loaded what segments.

If you do not see your segment here, on your historical node, check for logs such as:

Announcing segment[foo] at path[blah]

These logs indicate that a segment has been announced in Zookeeper.

If all these things are occurring and the coordinator seems fine, ensure that if data for a certain time range is partitioned, that all partitions have been loaded. Trying to query for a time range of data that is incomplete (not all partitions have been loaded) can return empty results. 

The thing that would probably help the most with debugging are logs on the coordinator about assigning the segment to a historical node, and logs on the historical node about downloading and serving the segment.

Thanks!
FJ

Wayne Adams

unread,
Jan 9, 2014, 3:00:14 PM1/9/14
to druid-de...@googlegroups.com
Hi Fangjin:

  I've been trying to understand a little more about the problem and so have some background data before I answer your question.

  First, the data I'm trying to load is about 80 columns, about 450 bytes per line.  In a given day there are about 38 million rows, plus or minus.  Previously, when I was using the old indexer on 0.5.48, I had managed to load a partial dayfile that contained about 30M rows covering 20 hours of data.  At that time, after the load completed, it took several hours for queries to return data (that is, they returned empty arrays until then), including the time-boundary query.  I think I'm just now understanding your comment about partial queries if data hasn't loaded yet, and I'm wondering if the time-boundary query actually requires that everything be loaded (it would make sense).

  When I switched to 0.6 indexing service, I was monitoring in the overlord console and saw success, then could look at the status and when I replace the "status" URI frag with "segments" I can see the segment appear, just as it is in the MySQL database.  When I stop everything (all Druid processes, zookeeper etc.) and restart, the console is empty, which seemed a little odd.  The segment is in-place on disk at the location specified in the metadata returned from the console and a MySQL query.  They are still there even though the overlord console no longer shows the segment after a restart.

At the overlord's "cluster.html" link, I get the "Loading segment data... this may take a few minutes" and it never returns.

For my background investigation, I went back to using the HadoopDruidIndexerMain, but in 0.6 (that is, "io.druid.cli.Main index hadoop"), starting with a small file, for successively larger subsets of the original 1M-row file, using head -nnn on the original file.  For files containing the first 10, 100, 1000, 10000, and 100000 lines of the original file, the time-boundary query returns a non-empty array within at most 15-30 seconds of the completion of the load, but when I load the full 1M lines, I get the same result as when I use the new Indexing Service (which I believe is actually just a thin wrapper around the old Hadoop indexer, right?) -- all queries on that load (which I place in a different datasource to avoid confusion) return empty arrays.

I'm assuming that my configuration must be producing a segment which the historical node cannot bring into memory.  I feel I'm somewhat constrained on granularity, etc. because of the one-reducer/one-segment issue, but I'd like to know more about why this doesn't seem to work well before I go off and build a full Hadoop cluster just to see what happens.

Re: querying over a certain time range:  the time-boundary query should work regardless, right?  I suspect that that query must return data before I even try executing any other type of query.  Or does that query require everything be loaded, from the entire datasource?

If I have a file that is approximately 17GB in size, containing 24 hours of data at minute granularity, where there are about 30K rows of data per minute, what are reasonable values to use for the granularity spec and the rollup granularity?  I've had to set targetPartitionSize to 0 to avoid the partitioning-algorithm error noted elsewhere. I suspect if I could segment my data per hour that things would improve (although this 1M-row example is only 25 minutes of data), but I don't know if I can even do that with the default local Hadoop instance unless I chop my data files into hourfiles.  Should I set up a real Hadoop cluster?

One last thing -- just a couple of minutes ago, I tried loading the smaller 100K-row table using the indexing service.  That appeared to succeed in the console, and within seconds I was able to query it.  I did not see any "Announcing segment..." messages in any of the Druid-node log files.

For 1M row file, index.zip is about 1.7GB
For 100K row file, index.zip is about 63MB

Sorry for all that detail, hope this info helps, and once again thanks for your help!

Regards, Wayne

Wayne Adams

unread,
Jan 9, 2014, 6:34:31 PM1/9/14
to druid-de...@googlegroups.com
I should add that I noticed one more thing today when I was comparing ingesting via the Indexing Service and via the old-style "io.druid.cli.Main index hadoop", both in 0.6.  The dataSource segment storage directory is duplicated when ingesting via the Indexing Service vs. the old-style indexer.  In other words, in indexing service, the segment is e.g. under

  .../segments/dataSourceName/dataSourceName/2013-08-02T00:00:00.000Z_2013-08-02T01:00:00.000Z

while with the old-style indexer, it's stored under (e.g.)

  .../segments/dataSourceName/2013-08-02T00:00:00.000Z_2013-08-03T00:00:00.000Z

For the ingest jobs that succeed (for me), this doesn't seem to hurt anything (the metadata in MySQL reflects the duplication, so I don't think any Druid process has difficulty finding the segment), but it appears something got appended twice.

Thanks -- Wayne

Fangjin Yang

unread,
Jan 10, 2014, 1:01:06 AM1/10/14
to druid-de...@googlegroups.com
Hi Wayne, some comments inline:


On Thursday, January 9, 2014 12:00:14 PM UTC-8, Wayne Adams wrote:
Hi Fangjin:

  I've been trying to understand a little more about the problem and so have some background data before I answer your question.

  First, the data I'm trying to load is about 80 columns, about 450 bytes per line.  In a given day there are about 38 million rows, plus or minus.  Previously, when I was using the old indexer on 0.5.48, I had managed to load a partial dayfile that contained about 30M rows covering 20 hours of data.  At that time, after the load completed, it took several hours for queries to return data (that is, they returned empty arrays until then), including the time-boundary query.  I think I'm just now understanding your comment about partial queries if data hasn't loaded yet, and I'm wondering if the time-boundary query actually requires that everything be loaded (it would make sense).

Druid's sharding model always first partitions data by time. Typically, we like to keep segments at around 1G in size. Depending on your segment granularity, additional partitioning for a time interval may be required (for example, if the cardinality of a single dimension for a certain time range is too high). Hence, it is possible that multiple segments are created that have the same datasource, time interval, and version, but differ in the partition number (Druid segments are identified as datasource_interval_version_partitionNum).  In such cases, all partitioned segments for a time interval must be loaded in Druid before that time interval is queryable. Without seeing what segments actually exist in your cluster, it is difficult to determine if this is happening. I am surprised that it took several hours to load the data before queries are returned. What are the size of your segments and does network latency play a factor in your setup?

  When I switched to 0.6 indexing service, I was monitoring in the overlord console and saw success, then could look at the status and when I replace the "status" URI frag with "segments" I can see the segment appear, just as it is in the MySQL database.  When I stop everything (all Druid processes, zookeeper etc.) and restart, the console is empty, which seemed a little odd.  The segment is in-place on disk at the location specified in the metadata returned from the console and a MySQL query.  They are still there even though the overlord console no longer shows the segment after a restart.

Overlords maintain a history of the jobs they've recently run in memory, so restarting the overlord will wipe old completed jobs. Do the historical node logs give any indication that they have begun downloading a segment once it has appeared in MySQL? Similarly, do the coordinator logs state anything about assigning a newly created segment to the historicals?
 
At the overlord's "cluster.html" link, I get the "Loading segment data... this may take a few minutes" and it never returns.

I believe you should be hitting /console.html on the overlord. The overlord console does reuse some code from the coordinator console. Accessing /cluster.html on the Coordinator (formerly called the master) console should the status of your cluster.
 
For my background investigation, I went back to using the HadoopDruidIndexerMain, but in 0.6 (that is, "io.druid.cli.Main index hadoop"), starting with a small file, for successively larger subsets of the original 1M-row file, using head -nnn on the original file.  For files containing the first 10, 100, 1000, 10000, and 100000 lines of the original file, the time-boundary query returns a non-empty array within at most 15-30 seconds of the completion of the load, but when I load the full 1M lines, I get the same result as when I use the new Indexing Service (which I believe is actually just a thin wrapper around the old Hadoop indexer, right?) -- all queries on that load (which I place in a different datasource to avoid confusion) return empty arrays.

The indexing service is a bit more complex than just a wrapper around the old Hadoop indexer. Hadoop indexing is a task that can run on the indexing service but there are many other tasks that can run. From your symptoms, it sounds like some fishy is going on with either the coordinator assigning a segment to a historical node or the historical node being able to download and serve the segment.

I'm assuming that my configuration must be producing a segment which the historical node cannot bring into memory.  I feel I'm somewhat constrained on granularity, etc. because of the one-reducer/one-segment issue, but I'd like to know more about why this doesn't seem to work well before I go off and build a full Hadoop cluster just to see what happens.

The historical node logs will give more insight as to what is happening.
 

Re: querying over a certain time range:  the time-boundary query should work regardless, right?  I suspect that that query must return data before I even try executing any other type of query.  Or does that query require everything be loaded, from the entire datasource?

Time-boundary queries will work as long as segments are loaded that are queryable. So for example, if there is a week of data and 7 daily granularity segments. As soon as one segment is loaded, time boundary queries should return results. These segments identifiers may look like so:
sampleDatasource_2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z_2013-01-04T04:09:13.590Z
sampleDatasource_2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z_2013-01-04T04:09:13.590Z
sampleDatasource_2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z
...

Now, if there is a week's worth of data, and daily data has to be further partitioned such that each day contains two partition segments, if a only single partition segment is loaded, queries will not return results until all partitions of a time interval are loaded. In this case, segment identifiers may look like so:
sampleDatasource_2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z_2013-01-04T04:09:13.590Z_0
sampleDatasource_2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z_2013-01-04T04:09:13.590Z_1
sampleDatasource_2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z_2013-01-04T04:09:13.590Z_0
sampleDatasource_2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z_2013-01-04T04:09:13.590Z_1
... 

If I have a file that is approximately 17GB in size, containing 24 hours of data at minute granularity, where there are about 30K rows of data per minute, what are reasonable values to use for the granularity spec and the rollup granularity? 

Daily or hourly segment granularity seem reasonable here.
 
I've had to set targetPartitionSize to 0 to avoid the partitioning-algorithm error noted elsewhere.

Which partitioning-algorithm error is this?
 
I suspect if I could segment my data per hour that things would improve (although this 1M-row example is only 25 minutes of data), but I don't know if I can even do that with the default local Hadoop instance unless I chop my data files into hourfiles.  Should I set up a real Hadoop cluster?

For data volumes of this size, I don't think you need full blown Hadoop indexing yet. 

One last thing -- just a couple of minutes ago, I tried loading the smaller 100K-row table using the indexing service.  That appeared to succeed in the console, and within seconds I was able to query it.  I did not see any "Announcing segment..." messages in any of the Druid-node log files.

That seems strange. What kind of logs do you see on your historical nodes?

Fangjin Yang

unread,
Jan 10, 2014, 1:01:55 AM1/10/14
to druid-de...@googlegroups.com
Nice catch! We'll try and fix this.

Fangjin Yang

unread,
Jan 10, 2014, 1:07:03 AM1/10/14
to druid-de...@googlegroups.com
BTW, another thing to note is that because you are setting your target partition size to 0, you are not getting any partitioning for a given time range. I think the best bet that comes to my mind is to take a look at your coordinator and historical logs to btter understand why it is taking so long for a segment to be queryable.

Wayne Adams

unread,
Jan 10, 2014, 12:08:14 PM1/10/14
to druid-de...@googlegroups.com
Hi Fangjin:

  Thanks for all the good pointers!  I will work my way through them today but have a couple of comments already:

1)  If I set the targetPartitionSize to any value other than 0, I get:

2014-01-10 21:48:16,221 INFO [Thread-33] org.apache.hadoop.mapred.Task - Task 'attempt_local_0002_m_000000_0' done.
2014-01-10 21:48:16,223 INFO [Thread-33] org.apache.hadoop.mapred.Task -  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@7928ec65
2014-01-10 21:48:16,223 INFO [Thread-33] org.apache.hadoop.mapred.LocalJobRunner - 
2014-01-10 21:48:16,224 INFO [Thread-33] org.apache.hadoop.mapred.Merger - Merging 1 sorted segments
2014-01-10 21:48:16,224 INFO [Thread-33] org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 1 segments left of total size: 33 bytes
2014-01-10 21:48:16,224 INFO [Thread-33] org.apache.hadoop.mapred.LocalJobRunner - 
2014-01-10 21:48:16,227 INFO [Thread-33] io.druid.indexer.DeterminePartitionsJob - Determining partitions for interval: 2013-08-02T00:00:00.000Z/2013-08-03T00:00:00.000Z
2014-01-10 21:48:16,228 WARN [Thread-33] org.apache.hadoop.mapred.LocalJobRunner - job_local_0002
com.metamx.common.ISE: No suitable partitioning dimension found!
	at io.druid.indexer.DeterminePartitionsJob$DeterminePartitionsDimSelectionReducer.innerReduce(DeterminePartitionsJob.java:689)
	at io.druid.indexer.DeterminePartitionsJob$DeterminePartitionsDimSelectionBaseReducer.reduce(DeterminePartitionsJob.java:480)
	at io.druid.indexer.DeterminePartitionsJob$DeterminePartitionsDimSelectionBaseReducer.reduce(DeterminePartitionsJob.java:453)
	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:260)

By "noted elsewhere" I was referring to another post on the group titled "HadoopIndexerProblem using CSV".  I don't know if my issue is similar to that one (I certainly have more rows!), but on the other hand I've not yet been able to ingest any file without having to set this value to 0 first.

I have around 90 columns or so.  These records are beacon messages from client applications on our network, where every client sends a message once per minute.  For this particular file, that's about 35K messages per minute.  A lot of the column values are null.  There is one column which identifies each client, so if "n" is number of rows in a minute, then the cardinality of that dimension, for that minute, is also "n".  I have another column that I'm reasonably certain will never be null and will have a much lower cardinality (~ a few hundred) per minute.  But if I specify a value for targetPartitionSize and use that column as the partitionDimension, I still get the above error message.  If I set a value for targetPartitionSize and do not specify a partitionDimension, I get the above error message.  If I set it to 0, the job runs, but then I get the really large segments and the empty-response queries after load completes.  I'm not sure I understand how to use this spec to control the partition size, including its limitations.  If I understood that better it would be great, as I could have control over how large these things actually get.

I'll have more info later...

Thanks again -- Wayne

Wayne Adams

unread,
Jan 10, 2014, 1:16:34 PM1/10/14
to druid-de...@googlegroups.com
Hi Fangjin:

  I have a partial solution to my issue.  The problem turned out to be this:  I had started the Historical node with the default properties shown in the documentation, and it wasn't clear to me at the time that "druid.server.maxSize" refers to the maximum size of segment that the node will query before it defaults to returning an empty array.  This was set to 100M.  When I restarted with it set to 10G everything worked!  Note:  index.zip is about 63MB; not sure how that relates to "druid.server.maxSize" but it was definitely lower than the max before, when the query was not returning.

  I don't quite understand the relationship between the following Historical-node properties:

druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=1000000000
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]

but I bumped all 3 of them up.  Did I need to do that?

btw "maxSegmentSize" might have caught my eye a lot sooner than "maxSize".  ;->

So, until I can figure out why my ingest crashes every time I set targetPartitionSize to a value other than 0, I can at least control the segment size by loading my data by hour.  I still haven't solved the issue of the failure I get when I try to load a full day.  For now I'm going to try to load per hour, and when I figure out how to control the partition size I'll try loading a full day again.

Thanks, Fangjin!  -- Wayne

Fangjin Yang

unread,
Jan 10, 2014, 1:49:22 PM1/10/14
to druid-de...@googlegroups.com
Hi Wayne:


On Friday, January 10, 2014 10:16:34 AM UTC-8, Wayne Adams wrote:
Hi Fangjin:

  I have a partial solution to my issue.  The problem turned out to be this:  I had started the Historical node with the default properties shown in the documentation, and it wasn't clear to me at the time that "druid.server.maxSize" refers to the maximum size of segment that the node will query before it defaults to returning an empty array.  This was set to 100M.  When I restarted with it set to 10G everything worked!  Note:  index.zip is about 63MB; not sure how that relates to "druid.server.maxSize" but it was definitely lower than the max before, when the query was not returning.

druid.server.maxSize refers to the maximum total size of segments (in bytes) that a node will accept. In the examples, the default maxSize is relatively small, but that parameter should be adjusted if you are running in more powerful machines. You bring up a good point that the maxSize showcased in the examples should probably be something greater. Awesome to hear that things are working though.
 
  I don't quite understand the relationship between the following Historical-node properties:

druid.server.maxSize=10000000000
druid.processing.buffer.sizeBytes=1000000000
druid.segmentCache.locations=[{"path": "/tmp/druid/indexCache", "maxSize"\: 10000000000}]

but I bumped all 3 of them up.  Did I need to do that?

There is more information about these parameters here: http://druid.io/docs/latest/Configuration.html and how they interoperate.

The processing buffer size is used to store intermediate computation results and by default is set to 1G in size. 
 
btw "maxSegmentSize" might have caught my eye a lot sooner than "maxSize".  ;->

So, until I can figure out why my ingest crashes every time I set targetPartitionSize to a value other than 0, I can at least control the segment size by loading my data by hour.  I still haven't solved the issue of the failure I get when I try to load a full day.  For now I'm going to try to load per hour, and when I figure out how to control the partition size I'll try loading a full day again.

It is very interesting that the ingest will crash. Can you share logs for this case? 

Wayne Adams

unread,
Jan 10, 2014, 6:28:32 PM1/10/14
to druid-de...@googlegroups.com
Hi Fangjin:

  Re: the load that will crash -- I'm going to run that particular job soon and update you later.  That was when I was trying to load a full day's data in the "old style" Hadoop indexer, not using the Indexing Service.  I don't have that log any more so I'll have to regenerate it, but as I remember the failure was on something being larger than max-integer.

  My current status, moving over to the Indexing Service, is that I don't think I'll be able to make this work until I can control all the processes that are writing to /tmp.  I took a smaller file from my data, representing one hour of data, and I was not able to ingest it because the plumber still writes to /tmp (/tmp/persistent/task, to be exact), even if I set the following properties in both the Overlord and the Middle Manager:

-Ddruid.indexer.baseDir=/data1/tmp/base
-Ddruid.indexer.fork.property.druid.indexer.baseDir=/data1/tmp/base
-Ddruid.indexer.hadoopWorkingPath=/data1/tmp/hadoop
-Ddruid.indexer.fork.property.druid.indexer.hadoopWorkingPath=/data1/tmp/hadoop
-Ddruid.indexer.taskDir=/data1/tmp/persistent
-Ddruid.indexer.runner.taskDir=/data1/tmp/persistent
-Ddruid.indexer.fork.property.druid.indexer.taskDir=/data1/tmp/persistent -Ddruid.indexer.fork.property.druid.indexer.runner.taskDir=/data1/tmp/persistent
-Djava.io.tmpdir=/data1/tmp/middleManager
-Ddruid.indexer.fork.property.java.io.tmpdir=/data1/tmp/middleManager

(I had noticed the "druid.indexer.baseDir" property when searching through the repo, so I added that also).  Despite the above properties, I still get the following spills occurring:

2014-01-11 04:03:42,112 INFO [task-runner-0] io.druid.indexing.common.index.YeOldePlumberSchool - Spilling index[0] with rows[500000] to: /tmp/persistent/task/index_am_extract_2aug2013_00Z_simple_index_2014-01-11T04:02:24.171Z

At this point I've chopped my data down to one hour, and if I have to subdivide it any further I don't know how I'm going to be able to merge the resulting segments without the latest fragment overwriting the previously-loaded ones.

If I can stop the write to /tmp, it would be a big help.  Appreciate any feedback you have.

Thanks again -- Wayne

Wayne Adams

unread,
Jan 11, 2014, 12:35:22 PM1/11/14
to druid-de...@googlegroups.com
Hi Fangjin:

  I re-ran my full-day datafile using the old-style Hadoop indexer and here is how the process ends.  Appreciate if you get a chance to look at this, and please let me know if you need to see any directory listings, etc.  The file I was loading is:

[ec2-user@ip-10-9-180-46 dayfiles]$ wc EXTRACT_20130802_UTC.txt
   38522966   976650903 16772155513 EXTRACT_20130802_UTC.txt

and the end of the log follows:

2014-01-11 10:03:21,509 INFO [Thread-2] io.druid.segment.IndexMerger - Completed dimension[timestamp] in 5,311 millis.
2014-01-11 10:03:21,509 INFO [Thread-2] io.druid.segment.IndexMerger - outDir[/data1/druid/druid-tmp/base971758480690827426flush/merged/v8-tmp] completed inverted.drd in 1,307,386 millis.
2014-01-11 10:03:21,789 INFO [communication thread] org.apache.hadoop.mapred.LocalJobRunner - reduce > reduce
2014-01-11 10:03:22,114 WARN [Thread-2] org.apache.hadoop.mapred.LocalJobRunner - job_local_0001
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
        at com.google.common.io.Files.map(Files.java:850)
        at com.google.common.io.Files.map(Files.java:837)
        at com.google.common.io.Files.map(Files.java:804)
        at com.google.common.io.Files.map(Files.java:776)
        at com.metamx.common.io.smoosh.FileSmoosher.add(FileSmoosher.java:113)
        at com.metamx.common.io.smoosh.Smoosh.smoosh(Smoosh.java:63)
        at io.druid.segment.IndexMerger.makeIndexFiles(IndexMerger.java:822)
        at io.druid.segment.IndexMerger.merge(IndexMerger.java:306)
        at io.druid.segment.IndexMerger.mergeQueryableIndex(IndexMerger.java:168)
        at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:362)
        at io.druid.indexer.IndexGeneratorJob$IndexGeneratorReducer.reduce(IndexGeneratorJob.java:237)

        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:260)
2014-01-11 10:03:22,471 INFO [main] org.apache.hadoop.mapred.JobClient - Job complete: job_local_0001
2014-01-11 10:03:22,471 INFO [main] org.apache.hadoop.mapred.JobClient - Counters: 20
2014-01-11 10:03:22,471 INFO [main] org.apache.hadoop.mapred.JobClient -   File Output Format Counters
2014-01-11 10:03:22,471 INFO [main] org.apache.hadoop.mapred.JobClient -     Bytes Written=8
2014-01-11 10:03:22,471 INFO [main] org.apache.hadoop.mapred.JobClient -   FileSystemCounters
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     FILE_BYTES_READ=4273260553370
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     FILE_BYTES_WRITTEN=4621856957739
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -   File Input Format Counters
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Bytes Read=16774199418
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -   Map-Reduce Framework
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Reduce input groups=1
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Map output materialized bytes=18236031221
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Combine output records=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Map input records=38522966
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Reduce shuffle bytes=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Physical memory (bytes) snapshot=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Reduce output records=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Spilled Records=149788576
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Map output bytes=18081936357
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     CPU time spent (ms)=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Total committed heap usage (bytes)=1054317477888
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Virtual memory (bytes) snapshot=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Combine input records=0
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Map output records=38522966
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     SPLIT_RAW_BYTES=67500
2014-01-11 10:03:22,472 INFO [main] org.apache.hadoop.mapred.JobClient -     Reduce input records=38522966
2014-01-11 10:03:22,486 INFO [main] io.druid.indexer.HadoopDruidIndexerJob - Deleting path[/data1/tmp/working/am_extract_old_hadoop_full_day/2014-01-11T050609.027Z]
2014-01-11 10:03:22,536 ERROR [main] io.druid.cli.CliHadoopIndexer - failure!!!!
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at io.druid.cli.CliHadoopIndexer.run(CliHadoopIndexer.java:104)
        at io.druid.cli.Main.main(Main.java:91)
Caused by: com.metamx.common.ISE: Job[class io.druid.indexer.IndexGeneratorJob] failed!
        at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:126)
        at io.druid.cli.CliInternalHadoopIndexer.run(CliInternalHadoopIndexer.java:50)
        at io.druid.cli.Main.main(Main.java:91)
        ... 6 more

Thanks for all your help!  -- Wayne

Eric Tschetter

unread,
Jan 11, 2014, 3:14:46 PM1/11/14
to druid-de...@googlegroups.com
Wow, nice! Wayne, you seem to have generated a segment with inverted
indexes larger than 2GB total.

I just got caught up on this thread and you mentioned that you
separated your data out into hour chunks to get some stuff loaded and
that you were worried about doing that for everything. I think you
should actually do that instead. With Druid, you can index each
individual hour into an hour segment and Druid will expose them to you
the same as if you had indexed the day together.

In general, we've kept segments to less than 10 million rows and
actually try to target 5 million or so per segment. IIRC, you have 37
million rows generated regularly over 1 day, so if you do them in
hourly chunks, you will have roughly 1.5M rows per segment, which
should be good. You should be able to do this by specifying the
segment granularity at "hour".

Also, you were asking about the partitionSize, I wonder what values
you set for that? A targetPartitionSize of "5000000" for example,
specifies that you want roughly 5,000,000 rows per segment. I forget
the exact algorithm that it uses to pick a partition dimension, but I
know it depends on finding boundaries that get close to the target,
somehow. If you just index at hourly granularity, though, you won't
have to worry about partitioning, because the segments will be small
enough.

Sorry for the trouble you've been having. The ingestion code was
written with the assumption that large files would be indexed via
Hadoop jobs on a Hadoop cluster, and I think you've proven that it has
many areas that can be further improved ;).

--Eric
> --
> You received this message because you are subscribed to the Google Groups
> "Druid Development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to druid-developm...@googlegroups.com.
> To post to this group, send email to druid-de...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/druid-development/19ab7304-ad00-4f18-aadb-4b09c0ac2a91%40googlegroups.com.
>
> For more options, visit https://groups.google.com/groups/opt_out.

Wayne Adams

unread,
Jan 13, 2014, 1:38:53 AM1/13/14
to druid-de...@googlegroups.com
Hi, Eric.  Responses inline...


On Saturday, January 11, 2014 1:14:46 PM UTC-7, Eric Tschetter wrote:
Wow, nice!  Wayne,  you seem to have generated a segment with inverted
indexes larger than 2GB total.

Thanks!  I do try to go where no user has gone before...  :)
 

I just got caught up on this thread and you mentioned that you
separated your data out into hour chunks to get some stuff loaded and
that you were worried about doing that for everything.  I think you
should actually do that instead.  With Druid, you can index each
individual hour into an hour segment and Druid will expose them to you
the same as if you had indexed the day together.

OK, just tried that now, in 0.6, but using the old-style ("io.druid.cli.Main index hadoop")
indexer.  This data loads fine, and (happily) fairly quickly.  Because of the /tmp
issue (see inline below), at least right now it appears the only way I can load this
is using this indexer, and not using either of the index tasks of the Indexing Service.
But this is good, and I hope to be able to eventually control all the node tmp-dir
settings...
 

In general, we've kept segments to less than 10 million rows and
actually try to target 5 million or so per segment.  IIRC, you have 37
million rows generated regularly over 1 day, so if you do them in
hourly chunks, you will have roughly 1.5M rows per segment, which
should be good.  You should be able to do this by specifying the
segment granularity at "hour".

Also, you were asking about the partitionSize, I wonder what values
you set for that?  A targetPartitionSize of "5000000" for example,
specifies that you want roughly 5,000,000 rows per segment.  I forget
the exact algorithm that it uses to pick a partition dimension, but I
know it depends on finding boundaries that get close to the target,
somehow.  If you just index at hourly granularity, though, you won't
have to worry about partitioning, because the segments will be small
enough.

The issue I was having is that I get an exception, and the load fails, if I set the targetPartitionSize
to any value other than 0.  I've not yet got that to work with any nonzero value...  That's when I
get the "No suitable partitioning dimension found!" message.

 

Sorry for the trouble you've been having.  The ingestion code was
written with the assumption that large files would be indexed via
Hadoop jobs on a Hadoop cluster, and I think you've proven that it has
many areas that can be further improved ;).

That's cool.  I'm still trying to determine what's the best avenue to ingest
data of this size/type, and the granularity, etc.

Re:  the Indexing Service in 0.6 -- I am still getting zinged by out-of-space
on /tmp, even after setting all the properties I mentioned earlier in this thread
to Fangjin.  With my /tmp partition size, I can't even load one hour of data.  I'm
wondering if there's some other process (other than Overlord and Middle Manager)
where I need to set "druid.indexer.taskDir", either as a process property or a child/fork
process property, or if maybe in this case the prop isn't being propagated to the
child after all.  After a quick search on the codebase it appears the offending
property is the TaskConfig baseTaskDir, but even if I set the property to a different
value it still blows out /tmp/persistent/task instead of using the directory I specify.

Tomorrow I'm going to tweak the build a little to see if I can at least manually
force the output to go elsewhere, but I was hoping you might know offhand if I'm setting
the wrong property, or setting it on the wrong node. Or if maybe the property isn't
being propagated to a child process.

Assuming I get past the /tmp issue, if I use Indexing Service for these files, would you
recommend I use the "index" task, or the "index_hadoop" task, to load files of this size?

Thanks again, Eric (and Fangjin!)

Regards, Wayne

Reply all
Reply to author
Forward
0 new messages