Data missing after successful ingestion tasks

727 views
Skip to first unread message

Amy Troschinetz

unread,
Sep 30, 2014, 4:34:56 PM9/30/14
to druid-de...@googlegroups.com
I'm trying to ingest data in 82 CSV files to druid use the local firehouse and indexing service method.

Each CSV file has 100,000 rows in it, each row is unique across all files.

Here is the JSON template for the indexing tasks:

{
    "type": "index",
    "dataSource": "simple_outclick",
    "granularitySpec":
    {
        "type": "uniform",
        "gran": "DAY",
        "intervals": ["$STARTDATE/$ENDDATE"]
    },
    "indexGranularity": "second",
    "aggregators": [
        {"type": "count", "name": "count"}
    ],
    "firehose":
    {
        "type": "local",
        "baseDir": "$(pwd)",
        "filter": "$DATAFILE",
        "parser":
        {
            "timestampSpec":
            {
                "column": "clickDate",
                "format": "yyyy-MM-dd HH:mm:ss"
            },
            "data":
            {
                "format": "tsv",
                "columns": ["masterClickId", "user", "userFingerprint", "siteId", "couponId", "partner", "splitTest", "kenshooId", "clickDate", "created", "application", "ip", "webAnalyticsId", "term", "outclickTypeId", "beacon_session", "calc_mediumCalc", "calc_network", "calc_clickDatePT", "calc_device", "calc_browser", "calc_clickPageClean", "calc_monetizable", "calc_affiliateLink", "calc_updated", "coupon_couponId", "coupon_link", "coupon_domain", "coupon_siteId", "coupon_superFeature", "coupon_exclusive", "coupon_noVotes", "coupon_yesVotes", "coupon_score", "coupon_position", "coupon_rank", "coupon_groupLabel", "cust_emailId", "goog_utmaDomainHash", "goog_utmaVisitorId", "goog_utmaFirstVisit", "goog_utmaPreviousVisit", "goog_utmaVisitorSessions", "goog_utmzDomainHash", "goog_utmzCookieLastSet", "goog_utmzTotalVisitorSession", "goog_utmzTotalSources", "goog_utmzSource", "goog_utmzCampaign", "goog_utmzMedium", "goog_utmzKeyword", "http_referrer", "http_originalReferrer", "http_referrerLanding", "http_originalLanding", "http_userAgent", "http_clickPage", "http_sessionReferrer", "map_clickId", "nat_appInstallationId", "nat_udid", "nat_appVersion", "nat_geoFenceId", "nat_latitude", "nat_longitude", "nat_geoCouponCount", "prod_productCardId", "prod_ideaSection", "prod_ideaSectionPosition", "prod_originalPrice", "prod_finalPrice", "prod_couponId", "prod_startDate", "prod_endDate", "prod_deepLink", "prod_siteId", "prod_productCardPosition", "site_siteId", "site_domain", "site_link", "site_wwwRequired", "syn_partnerRef", "test_campaignId", "test_variationId", "test_couponSearch", "test_slicer", "test_treatment", "test_fruit", "test_veg", "test_ginsu", "uniq_masterClickId", "uniq_keyHash", "url_urlSource", "url_urmMedium", "url_urlContent", "url_urlCampaign", "url_urlAdGroup", "url_urlKeyword", "utm_utmSource", "utm_utmMedium", "utm_utmContent", "utm_utmCampaign", "utm_utmSession", "utm_utmVisit", "utm_utmKeyword", "va_clickSection", "va_visitCount", "sem_gclid", "cust_emailUUId", "userUUId", "userQualifier", "deviceFingerprint"],
                "dimensions": ["masterClickId", "outclickTypeId", "siteId", "couponId", "calc_mediumCalc"]
            }
        }
    }
}

I kick off the ingestion process using the following command (ran on my overload node):

$ curl --silent --show-error -d @$TASK -H 'content-type: application/json' 'localhost:8080/druid/indexer/v1/task'

Where $TASK is the path to the generated JSON file.

In my coordinator console I can see that all my tasks have completed with statusCode set to SUCCESS.

The table looks something like this:

index_simple_outclick_2014-09-30T19:51:59.559Z SUCCESS
[...]
index_simple_outclick_2014-09-30T19:54:05.976Z SUCCESS

If I go into the log for any of these tasks, I see a line like the following:

2014-09-30 20:04:15,834 INFO [task-runner-0] io.druid.indexing.common.index.YeOldePlumberSchool - Spilling index[0] with rows[100000] to: /tmp/persistent/task/index_simple_outclick_2014-09-30T19:53:52.433Z/work/simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z_0/simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z/spill0

Note the number of rows: 100,000.

Now If I do a timeBoundary query on my data:

$ curl --silent --show-error -d @timeboundary_simple.json -H 'content-type: application/json' 'http://my-druid-broker-ip:8080/druid/v2/' --data-urlencode 'pretty' | python -mjson.tool

[
    {
        "result": {
            "maxTime": "2014-09-28T00:03:06.000Z",
            "minTime": "2014-09-26T23:40:01.000Z"
        },
        "timestamp": "2014-09-26T23:40:01.000Z"
    }
]

So now if I want to see how many events occurred each day I do:

$ curl --silent --show-error -d @by_day_simple.json -H 'content-type: application/json' 'http://my-druid-broker-ip:8080/druid/v2/' --data-urlencode 'pretty' | python -mjson.tool

Where by_day_simple.json is:

{
    "queryType": "groupBy", 
    "dataSource": "simple_outclick", 
    "granularity": "day", 
    "dimensions": [], 
    "aggregations": [
        {
            "type": "longSum",
            "fieldName": "count",
            "name": "totalClicks"
        }
    ], 
    "intervals": ["2014-09-26/2014-09-29"]
}

I get the following VERY unexpected result:

[
    {
        "event": {
            "totalClicks": 50384
        },
        "timestamp": "2014-09-26T00:00:00.000Z",
        "version": "v1"
    },
    {
        "event": {
            "totalClicks": 100000
        },
        "timestamp": "2014-09-27T00:00:00.000Z",
        "version": "v1"
    },
    {
        "event": {
            "totalClicks": 2275
        },
        "timestamp": "2014-09-28T00:00:00.000Z",
        "version": "v1"
    }
]

So where did all my events go???


This e-mail, including attachments, contains confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. The reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.

Fangjin Yang

unread,
Sep 30, 2014, 4:49:51 PM9/30/14
to druid-de...@googlegroups.com
Hi, see inline.

On Tue, Sep 30, 2014 at 1:34 PM, Amy Troschinetz <atrosc...@rmn.com> wrote:
I'm trying to ingest data in 82 CSV files to druid use the local firehouse and indexing service method.

Each CSV file has 100,000 rows in it, each row is unique across all files.

Here is the JSON template for the indexing tasks:

{
    "type": "index",
    "dataSource": "simple_outclick",
    "granularitySpec":
    {
        "type": "uniform",
        "gran": "DAY",
        "intervals": ["$STARTDATE/$ENDDATE"]

What are  the start/end dates supposed to be?
 Note the segment: simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z_0

That is for a certain interval of data.

Now If I do a timeBoundary query on my data:

$ curl --silent --show-error -d @timeboundary_simple.json -H 'content-type: application/json' 'http://my-druid-broker-ip:8080/druid/v2/' --data-urlencode 'pretty' | python -mjson.tool

[
    {
        "result": {
            "maxTime": "2014-09-28T00:03:06.000Z",
            "minTime": "2014-09-26T23:40:01.000Z"
        },
        "timestamp": "2014-09-26T23:40:01.000Z"
    }
]

It seems much of the 2014-09-26 data was not ingested. It also seems there is only 3 hrs of data for 2014-09-28 was ingested. The only complete day appears to be 2014-09-27 to 2014-09-28.
I think the interval of ingestion and the ingested files will provide clues. 

This e-mail, including attachments, contains confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. The reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately.

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-development/1df3ad5c-4151-4c06-8ad4-fa59e562acee%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Amy Troschinetz

unread,
Sep 30, 2014, 4:51:18 PM9/30/14
to druid-de...@googlegroups.com
On further inspection, not all the files have 100,000 rows, but most of them do. I took a look at the druid_segments table in MySQL and it looks like some of the data has been set to used = 0. Maybe that's the issue?

mysql> select id, used from druid_segments where dataSource = "simple_outclick";
+--------------------------------------------------------------------------------------------+------+
| id                                                                                         | used |
+--------------------------------------------------------------------------------------------+------+
| simple_outclick_2014-09-26T00:00:00.000Z_2014-09-27T00:00:00.000Z_2014-09-30T18:22:38.684Z |    0 |
[...]
| simple_outclick_2014-09-26T00:00:00.000Z_2014-09-27T00:00:00.000Z_2014-09-30T20:02:36.830Z |    1 |
| simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T19:53:42.350Z |    0 |
[...]
| simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:04:54.548Z |    1 |
[...]
| simple_outclick_2014-09-28T00:00:00.000Z_2014-09-29T00:00:00.000Z_2014-09-30T19:57:58.369Z |    0 |
| simple_outclick_2014-09-28T00:00:00.000Z_2014-09-29T00:00:00.000Z_2014-09-30T19:58:32.725Z |    1 |
+--------------------------------------------------------------------------------------------+------+

Can I resolve this issue by changing the values for these rows to used = 1?


It seems that this behavior is expected. Am I not loading data correctly here? I have lots of CSV files (waaaay more than just 82 of them) that have overlapping data in terms of time boundary. I want to aggregate all the events in all the files.

Amy Troschinetz

unread,
Sep 30, 2014, 5:15:02 PM9/30/14
to druid-de...@googlegroups.com
See inline.


On Tuesday, September 30, 2014 3:49:51 PM UTC-5, Fangjin Yang wrote:
Hi, see inline.

On Tue, Sep 30, 2014 at 1:34 PM, Amy Troschinetz <atrosc...@rmn.com> wrote:
        "intervals": ["$STARTDATE/$ENDDATE"]

What are  the start/end dates supposed to be?

I was just using the minimum and maximum days (year and month and day) in each file. How do I deal with the issue that sometimes I will have data for the same time boundary split across multiple files? I need to merge all the data together, not overwrite it.
 

2014-09-30 20:04:15,834 INFO [task-runner-0] io.druid.indexing.common.index.YeOldePlumberSchool - Spilling index[0] with rows[100000] to: /tmp/persistent/task/index_simple_outclick_2014-09-30T19:53:52.433Z/work/simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z_0/simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z/spill0

Note the number of rows: 100,000.

 Note the segment: simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z_0

That is for a certain interval of data.

I didn't realize each segment was defined by the intervals specified in the ingestion task, I figured it would be defined by the data itself, and that the intervals given in the index task were more or less arbitrary hints.

So how do I go about ingesting this data? Do I have to post process it into ordered rows of monotonically increasing clickDate and then specify the indexing task intervals down to the second? I'd rather just merge all the data together somehow automatically if that's possible.

Fangjin Yang

unread,
Sep 30, 2014, 5:22:15 PM9/30/14
to druid-de...@googlegroups.com
Hi, please see inline.


On Tuesday, September 30, 2014 1:51:18 PM UTC-7, Amy Troschinetz wrote:
On further inspection, not all the files have 100,000 rows, but most of them do. I took a look at the druid_segments table in MySQL and it looks like some of the data has been set to used = 0. Maybe that's the issue?

Druid does atomic swaps of segments. This means that if you have 2 segments that cover the exact same interval, Druid queries for data from the segment with the most recent version identifier.

Segments are uniquely identified by datasource_interval_version_partitionNumber(optional). In your case, you have multiple segments for the same time range, which is fine. Druid automatically invalidates segments with data that has been obsoleted by newer segments. 

mysql> select id, used from druid_segments where dataSource = "simple_outclick";
+--------------------------------------------------------------------------------------------+------+
| id                                                                                         | used |
+--------------------------------------------------------------------------------------------+------+
| simple_outclick_2014-09-26T00:00:00.000Z_2014-09-27T00:00:00.000Z_2014-09-30T18:22:38.684Z |    0 |
[...]
| simple_outclick_2014-09-26T00:00:00.000Z_2014-09-27T00:00:00.000Z_2014-09-30T20:02:36.830Z |    1 |
| simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T19:53:42.350Z |    0 |
[...]
| simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:04:54.548Z |    1 |
[...]
| simple_outclick_2014-09-28T00:00:00.000Z_2014-09-29T00:00:00.000Z_2014-09-30T19:57:58.369Z |    0 |
| simple_outclick_2014-09-28T00:00:00.000Z_2014-09-29T00:00:00.000Z_2014-09-30T19:58:32.725Z |    1 |
+--------------------------------------------------------------------------------------------+------+

Can I resolve this issue by changing the values for these rows to used = 1?

Not recommended, Druid will just end up invalidating them anyways. 


It seems that this behavior is expected. Am I not loading data correctly here? I have lots of CSV files (waaaay more than just 82 of them) that have overlapping data in terms of time boundary. I want to aggregate all the events in all the files.

This should be fine as we do this as well. See next email. 

Fangjin Yang

unread,
Sep 30, 2014, 5:25:54 PM9/30/14
to druid-de...@googlegroups.com
Inline.


On Tuesday, September 30, 2014 2:15:02 PM UTC-7, Amy Troschinetz wrote:
See inline.

On Tuesday, September 30, 2014 3:49:51 PM UTC-5, Fangjin Yang wrote:
Hi, see inline.

On Tue, Sep 30, 2014 at 1:34 PM, Amy Troschinetz <atrosc...@rmn.com> wrote:
        "intervals": ["$STARTDATE/$ENDDATE"]

What are  the start/end dates supposed to be?

I was just using the minimum and maximum days (year and month and day) in each file.

Are you creating unique indexing tasks per file or a single indexing task that ingests all files? The former may lead to the behaviour you are seeing and the later I believe is what you actually want to do.
 
How do I deal with the issue that sometimes I will have data for the same time boundary split across multiple files? I need to merge all the data together, not overwrite it.
 

2014-09-30 20:04:15,834 INFO [task-runner-0] io.druid.indexing.common.index.YeOldePlumberSchool - Spilling index[0] with rows[100000] to: /tmp/persistent/task/index_simple_outclick_2014-09-30T19:53:52.433Z/work/simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z_0/simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z/spill0

Note the number of rows: 100,000.

 Note the segment: simple_outclick_2014-09-27T00:00:00.000Z_2014-09-28T00:00:00.000Z_2014-09-30T20:03:49.816Z_0

That is for a certain interval of data.

I didn't realize each segment was defined by the intervals specified in the ingestion task, I figured it would be defined by the data itself, and that the intervals given in the index task were more or less arbitrary hints.

Druid will create the segment files based on the interval and segment granularity of your data. I was just pointing out the logs you showed are only for a single segment where correct results are being generated. 

So how do I go about ingesting this data? Do I have to post process it into ordered rows of monotonically increasing clickDate and then specify the indexing task intervals down to the second? I'd rather just merge all the data together somehow automatically if that's possible.

Looking at these two params: 
     "baseDir": "$(pwd)",
        "filter": "$DATAFILE"

Does baseDir include every single file you want to ingest?

Amy Troschinetz

unread,
Sep 30, 2014, 5:27:42 PM9/30/14
to druid-de...@googlegroups.com
See inline.


On Tuesday, September 30, 2014 4:22:15 PM UTC-5, Fangjin Yang wrote:

Druid does atomic swaps of segments. This means that if you have 2 segments that cover the exact same interval, Druid queries for data from the segment with the most recent version identifier.

Segments are uniquely identified by datasource_interval_version_partitionNumber(optional). In your case, you have multiple segments for the same time range, which is fine. Druid automatically invalidates segments with data that has been obsoleted by newer segments. 

Ah, but in my case I don't want to have "newer segments". Each of those segments is actually just part of the entire dataset. I don't want to invalidate any of those segments as they are all valid data for the same date range.

Amy Troschinetz

unread,
Sep 30, 2014, 5:30:10 PM9/30/14
to druid-de...@googlegroups.com
On Sep 30, 2014, at 4:25 PM, Fangjin Yang <fangj...@gmail.com> wrote:

Are you creating unique indexing tasks per file or a single indexing task that ingests all files? The former may lead to the behaviour you are seeing and the later I believe is what you actually want to do.

Yes, I am created unique indexing tasks per file.
 
Looking at these two params: 
     "baseDir": "$(pwd)",
        "filter": "$DATAFILE"

Does baseDir include every single file you want to ingest?

Yes, it does.

Fangjin Yang

unread,
Sep 30, 2014, 5:35:32 PM9/30/14
to druid-de...@googlegroups.com
I should learn to type :P

What you want to do is create a single indexing job for all your files and not indexing jobs per file.

--
You received this message because you are subscribed to the Google Groups "Druid Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-developm...@googlegroups.com.
To post to this group, send email to druid-de...@googlegroups.com.

Fangjin Yang

unread,
Sep 30, 2014, 5:36:12 PM9/30/14
to druid-de...@googlegroups.com
The reason is that multiple indexing jobs will generate multiple segments for the same range of time and cause Druid to only use the most recently generated segments.

Amy Troschinetz

unread,
Sep 30, 2014, 5:40:47 PM9/30/14
to druid-de...@googlegroups.com
On Sep 30, 2014, at 4:35 PM, Fangjin Yang <fan...@metamarkets.com> wrote:

What you want to do is create a single indexing job for all your files and not indexing jobs per file.

Hrm, I'm not sure how to do that. I tried using the filter "*.csv" but that results in the error:
com.metamx.common.ISE: Found no files to ingest! Check your schema.
The documentation doesn't indicate what kind of values are valid for the filter as far as I can tell: http://druid.io/docs/latest/Tasks.html

On Sep 30, 2014, at 4:36 PM, Fangjin Yang <fan...@metamarkets.com> wrote:

The reason is that multiple indexing jobs will generate multiple segments for the same range of time and cause Druid to only use the most recently generated segments.

Is there a way to disable this behavior? There will be lots of times where I will want to ingest more data for a date range for which I have already previously ingested data. I just want the new data to be merged into the old data, invalidating none of the data, invalidating none of the segments.

It's nice that I can overwrite data in the case that I need to do something like a backfill and replace bad data, but that is not my normal use case. The normal use case is "I have *more* data, let's ingest it and add it to our previously ingested data, yay! :)"

Data Software Engineer


Fangjin Yang

unread,
Sep 30, 2014, 6:00:01 PM9/30/14
to druid-de...@googlegroups.com
Apologies for the trouble you are having. The firehose should be documented much better.

I've created https://github.com/metamx/druid/pull/774/files to improve ingestion using the local firehose. You can now pass in wildcard patterns for your files. There are also docs in the PR.

Amy Troschinetz

unread,
Oct 1, 2014, 10:19:10 AM10/1/14
to druid-de...@googlegroups.com
On Tuesday, September 30, 2014 5:00:01 PM UTC-5, Fangjin Yang wrote:
Apologies for the trouble you are having. The firehose should be documented much better.

I've created https://github.com/metamx/druid/pull/774/files to improve ingestion using the local firehose. You can now pass in wildcard patterns for your files. There are also docs in the PR.

Sweet! Using a filter that accepts all the CSV files in the directory worked. However, I'm limited by disk space in the number of files I can ingest at a time and will still have to do multiple ingestions of sets of files. How can I ensure that these multiple ingestion tasks won't overwrite any previously loaded data? 

Fangjin Yang

unread,
Oct 1, 2014, 6:07:31 PM10/1/14
to druid-de...@googlegroups.com
For data sets larger than 1G, we recommend using the Hadoop based batch ingestion task. This is what several orgs use in production and many files can be passed to it. We typically see orgs upload all the files they have to ingest to a central deep storage somewhere (hdfs, s3, etc).
Reply all
Reply to author
Forward
0 new messages