Loading files from S3 with wildcards in index task - Daunting

1,242 views
Skip to first unread message

Varaga

unread,
May 10, 2018, 11:35:42 AM5/10/18
to Druid User
Hi Druiders,

     I'm trying to index large amount of data in S3 buckets. There are about 3000 files generated every day for the last 1 year.
     We do not want to set up batch ingestion using EMR cluster. So I have defined an index task to load up these files. The IO Config uses static-s3 firehose as below.
 
    The task seem to complete successfully however there are no dimensions indexed based on the task logs.
    I'm wondering if the wild card works?
 
"ioConfig" : {
     
"type" : "index",
     
"firehose" : {
       
"type" : "static-s3",
       
"prefixes": ["s3://<bucket>/2018/05/07/*.gz"]
     
},
     
"appendToExisting" : false
   
},
   
"tuningConfig" : {
     
"type" : "index",
     
"targetPartitionSize" : 5000000,
     
"maxRowsInMemory" : 25000,
     
"forceExtendableShardSpecs" : true
   
}



     Secondly, I tried without the wildcard and just with the directory as below. It seemed to load up all the files under the directory however failed with an OOM: (heap space). The middle manager is configured with 64M heap. The peons are made to run with 2G Heap and 1.5G MaxDirectMemoryBuffers. The granularity spec has DAY for segments and Hour for query...

"firehose" : {
       
"type" : "static-s3",
       
"prefixes": ["s3://<bucket>/2018/05/07/"]
     
},



   Thirdly, I changed the segment granularity to hour and query granularity to minute thinking that this would reduce the heap footprint. However that failed with Heap space as well.

   Unfortunately all of our files are randomnly generated timestamped filenames in utc. The file sizes vary as well.
 
   Can someone suggest what's the best way to index day's data through static-s3 firehose?

Best Regards
Varaga
  

Jihoon Son

unread,
May 10, 2018, 3:20:04 PM5/10/18
to druid...@googlegroups.com
Hi Varaga,

the 'prefixes' of static-s3 firehose is the prefixes of objects and the regex filter is not supported yet. So, if you set 'prefixes' to ["s3://<bucket>/2018/05/07/"], then the firehose will read all objects under 's3://<bucket>/2018/05/07/'. I raised an issue for supporting the regex filter here https://github.com/druid-io/druid/issues/5772.

Regarding OOM, can you post the stacktrace here? What kinds of OOM did you see?

Probably your maxRowsInMemory is too low unless your data has a lot of columns (its default is 75000 which is set conservatively). If maxRowsInMemory is too low, Druid tasks try to persist generated segments too frequently, thereby generating too many segment files. Those files should be merged before publishing which can occur OOM if too many files should be merged.

Jihoon

2018년 5월 10일 (목) 오전 8:35, Varaga <chakrav...@gmail.com>님이 작성:
--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/863a314e-0b3b-401c-b6e0-2bc7ef3001d7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Varaga

unread,
May 11, 2018, 7:05:21 AM5/11/18
to Druid User
Hi Jihoon,

   Thanks for your response. I tried modifying the `maxRowsInMemory` with 500K, 1 million rows etc., and the `targetPartitionSize` is 1 million rows and both failed with `OOM:heapspace`. A day's files' will have roughly 2 million rows.

   The files are json documents and each of them have atleast 30-40 fields (in druid terms).
Out of this, only 4 dimensions were indexed. I use flatten spec as our json documents are nested object structs and the 4 dimensions are json path parsed/flattened.
  
   Here below is the stack trace:

  
2018-05-11T10:28:32,500 ERROR [task-runner-0-priority-0] io.druid.indexing.common.task.IndexTask - Encountered exception in DETERMINE_PARTITIONS.
com
.amazonaws.SdkClientException: Failed to sanitize XML document destined for handler class com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser$ListObjectsV2Handler
        at com
.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:214) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.model.transform.XmlResponsesSaxParser.parseListObjectsV2Response(XmlResponsesSaxParser.java:315) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:88) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.model.transform.Unmarshallers$ListObjectsV2Unmarshaller.unmarshall(Unmarshallers.java:77) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:62) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.internal.S3XmlResponseHandler.handle(S3XmlResponseHandler.java:31) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:70) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1553) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1271) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4229) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4176) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4170) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at com
.amazonaws.services.s3.AmazonS3Client.listObjectsV2(AmazonS3Client.java:865) ~[aws-java-sdk-bundle-1.11.199.jar:?]
        at io
.druid.storage.s3.S3Utils$2.fetchNextBatch(S3Utils.java:124) ~[?:?]
        at io
.druid.storage.s3.S3Utils$2.next(S3Utils.java:147) ~[?:?]
        at io
.druid.storage.s3.S3Utils$2.next(S3Utils.java:114) ~[?:?]
        at com
.google.common.collect.Iterators.addAll(Iterators.java:357) ~[guava-16.0.1.jar:?]
        at com
.google.common.collect.Lists.newArrayList(Lists.java:147) ~[guava-16.0.1.jar:?]
        at io
.druid.firehose.s3.StaticS3FirehoseFactory.initObjects(StaticS3FirehoseFactory.java:138) ~[?:?]
        at io
.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.connect(PrefetchableTextFilesFirehoseFactory.java:167) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory.connect(PrefetchableTextFilesFirehoseFactory.java:89) ~[druid-api-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.indexing.common.task.IndexTask.collectIntervalsAndShardSpecs(IndexTask.java:716) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.indexing.common.task.IndexTask.createShardSpecsFromInput(IndexTask.java:645) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.indexing.common.task.IndexTask.determineShardSpecs(IndexTask.java:583) ~[druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.indexing.common.task.IndexTask.run(IndexTask.java:417) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:456) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at io
.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:428) [druid-indexing-service-0.13.0-SNAPSHOT.jar:0.13.0-SNAPSHOT]
        at java
.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_161]
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_161]
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_161]
        at java
.lang.Thread.run(Thread.java:748) [?:1.8.0_161]
Caused by: java.lang.OutOfMemoryError: Java heap space


    There are also other OOMs.
   
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "main-SendThread(ip-10-35-19-87.ec2.internal:2181)"

2018-05-11T10:28:32,499 DEBUG [JettyScheduler] org.eclipse.jetty.server.session - Scavenging sessions at 1526034512491
Exception in thread "HttpClient-Netty-Boss-0" java.lang.OutOfMemoryError: Java heap space
        at java
.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:932)
        at java
.util.concurrent.ThreadPoolExecutor.processWorkerExit(ThreadPoolExecutor.java:1025)
        at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java
.lang.Thread.run(Thread.java:748)

    It should also be noted that the TASK does not complete to FAILED or so. I presume that the Peon died aand the overlord console shows that the TASK is ever running !

Best Regards
Varaga

Jihoon Son

unread,
May 11, 2018, 1:50:11 PM5/11/18
to druid...@googlegroups.com
Varaga,

this is interesting. It looks that the task failed while fetching the object list from s3. Was the number of input files about 3000 as specified above?
Would you post your middleManager configuration?

Jihoon

2018년 5월 11일 (금) 오전 4:05, Varaga <chakrav...@gmail.com>님이 작성:
--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.

Varaga

unread,
May 14, 2018, 4:53:15 AM5/14/18
to Druid User
Hi Jihoon,

    My bad, I was wrong with the no., of files. There are actually 14528 files on that day. Should this not be batched to read a smaller set of files every ROP?

    Here is my configuration:

   
druid.service=druid/middlemanager
druid
.host=<Aurora Assigned Host:Port>
druid
.port=<Aurora Assigned>

# HTTP server threads
druid
.server.http.numThreads=40

# Processing threads and buffers
druid
.processing.buffer.sizeBytes=36870912
druid
.processing.numMergeBuffers=2
druid
.processing.numThreads=2

# Resources for peons
druid
.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Dlog4j.configurationFile=/opt/druid/conf/druid/middleManager/log4j2.xml
druid
.indexer.task.baseDir=/mnt/<Masked>/task
druid.indexer.logs.directory=/
mnt/<Masked>/task/logs
#druid.indexer.task.restoreTasksOnRestart=true
druid
.indexer.runner.startPort=40000

# Peon properties
druid
.indexer.fork.property.druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]
druid
.indexer.fork.property.druid.processing.buffer.sizeBytes=136870912
druid
.indexer.fork.property.druid.processing.numMergeBuffers=2
druid
.indexer.fork.property.druid.processing.numThreads=2


# Number of tasks per middleManager
druid
.worker.capacity=3
druid
.worker.ip=localhost
druid
.worker.version=0

Here is the jvm.config for middle manager

-server
-Xmx64m
-Xms64m
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

Chakravarthy varaga

unread,
May 14, 2018, 4:15:00 PM5/14/18
to druid...@googlegroups.com
Hi Jihoon,

      Any updates here?

Best Regards
Varaga

Jihoon Son

unread,
May 14, 2018, 4:21:37 PM5/14/18
to druid...@googlegroups.com
Hi Varaga,

I'm not sure what is occupying so much memory. Can you add "-XX:+HeapDumpOnOutOfMemoryError" to 'druid.indexer.runner.javaOpts' to get a heap dump file on the OOM error? You can use YourKit or VisualVm to analyze it.

In the mean time, I think there are two possible workarounds.

- Increasing the heap size of peon to something larger than 2 GB.
- Limit the max number of files ingested per task to something small like 5000.

Jihoon

2018년 5월 14일 (월) 오후 1:15, Chakravarthy varaga <chakrav...@gmail.com>님이 작성:

Chakravarthy varaga

unread,
May 14, 2018, 4:30:20 PM5/14/18
to druid...@googlegroups.com
Hi Jihoon,

     Is there a configuration to set the max files for an indexing task?
     I'll try increasing the heap size. Does the peon need direct buffers at all? right now, I have 1.5G for off-heap and 2GB heap for each worker.  It's not quite clear if peon uses off-heap memory !

Best Regards
Varaga


   

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/8c625491-0dfc-40da-985d-eaadfcef6cdd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

Jihoon Son

unread,
May 14, 2018, 5:06:34 PM5/14/18
to druid...@googlegroups.com
Varaga,

> Is there a configuration to set the max files for an indexing task?

No, it isn't. You should do it manually by specifying the prefix properly.

> I'll try increasing the heap size. Does the peon need direct buffers at all? right now, I have 1.5G for off-heap and 2GB heap for each worker. It's not quite clear if peon uses off-heap memory !

Yes, peon needs off-heap memory for query processing. If you don't use stream ingestion, you can minimize it by adjusting druid.processing.buffer.sizeBytes, druid.processing.numThreads, and druid.processing.numMergeBuffers. Please note that the amount of memory needed is 'druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)'.

Jihoon

2018년 5월 14일 (월) 오후 1:30, Chakravarthy varaga <chakrav...@gmail.com>님이 작성:

--
You received this message because you are subscribed to the Google Groups "Druid User" group.

Chakravarthy varaga

unread,
May 14, 2018, 5:14:15 PM5/14/18
to druid...@googlegroups.com
Thanks Jihoon,

    The prefix can't be set as you said since regex isn't supported yet for static-s3 firehose? Setting uris is not a runner since there are 14K files/day.

    I'm also planning to use Kafka Indexing Service. I'm not sure if that falls under stream ingestion?

Varaga

To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/8c625491-0dfc-40da-985d-eaadfcef6cdd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to dr

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.

To post to this group, send email to druid...@googlegroups.com.

Jihoon Son

unread,
May 14, 2018, 5:24:18 PM5/14/18
to druid...@googlegroups.com
Varaga,

> The prefix can't be set as you said since regex isn't supported yet for static-s3 firehose? Setting uris is not a runner since there are 14K files/day.

The prefix is a filter to input files by checking the prefix of paths of input files. Let me suppose you have 3 files and their URIs are s3://some/path/file_1.csv, s3://some/path/file_1_1.csv and s3://some/path/file_2.csv, respectively. You can ingest only s3://some/path/file_1.csv and s3://some/path/file_1_1.csv by specifying the prefix to s3://some/path/file_1.

> I'm also planning to use Kafka Indexing Service. I'm not sure if that falls under stream ingestion?

Yes, kafka indexing service is a stream ingestion method. 

Jihoon

2018년 5월 14일 (월) 오후 2:14, Chakravarthy varaga <chakrav...@gmail.com>님이 작성:

Chakravarthy varaga

unread,
May 15, 2018, 12:52:50 PM5/15/18
to druid...@googlegroups.com
Hi Jihoon,

    I increased heap upto 16GB + 1.5G for max buffers, with 5GB hard disk. It failed again with OOM:heap
    Unfortunately I couldn't use the prefixes as our files are generated with <timeinMillis-Now>-<Hour-Minute-Sec> formats.
    I've kind of hit with road block.
  
    The hprof was generated when the heap was at 8GB. I didn't try to load it as this would hoard up my laptop.

Best Regards
Varaga

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.

Jihoon Son

unread,
May 15, 2018, 4:10:08 PM5/15/18
to druid...@googlegroups.com
Hi Varaga,

would you please upload the heap dump file on something public like google drive or s3?

Jihoon

2018년 5월 15일 (화) 오전 9:52, Chakravarthy varaga <chakrav...@gmail.com>님이 작성:

Chakravarthy varaga

unread,
May 16, 2018, 12:19:43 PM5/16/18
to druid...@googlegroups.com
Hi Jihoon,

    Here is the link to the hprof file in drive:

--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+unsubscribe@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.

Jihoon Son

unread,
May 16, 2018, 2:59:15 PM5/16/18
to druid...@googlegroups.com
Varaga,

from the heap dump file, the list of 'S3ObjectSummary's took most of your memory (98%). It looks that the size of the list was more than 16,000,000. Would you double-check how many files are in the path you specified?

Jihoon

2018년 5월 16일 (수) 오전 9:19, Chakravarthy varaga <chakrav...@gmail.com>님이 작성:

Chakravarthy varaga

unread,
May 16, 2018, 5:34:15 PM5/16/18
to druid...@googlegroups.com
Jihoon, thanks for your response on the heap dump,
  
   Let me check and revert with respect to the no., of files.

   Are there dimensioning characteristics and scale factors based on data sizes etc., documented? I think it'd be great if there is documentation or guidance around resources needed vs data sizes for indexing, segment caches sizes etc., 

Best Regards
Varaga

Out of this, only 4 dimensions were indexed. I use flatten spec as our json documents are nested object structs and the 4 dimensions are json path parsed/flattened.

com
.amazonaws<span style="c
Reply all
Reply to author
Forward
0 new messages