Need help with data ingestion from S3 and local files.

1,180 views
Skip to first unread message

Sameer Paradkar

unread,
May 3, 2018, 5:27:11 PM5/3/18
to Druid User
Hi 

I am trying to ingest data both from a S3 bucket and a local filesystem (separately using different ingestion config 
files and to different data sources). I am using druid-0.12.0

I have the following set up in common.runtime,properties in both conf/druid/_common and conf-quickstart/druid/_common 
directories I have druid_s3_extensions in loadList.

# For S3:

druid.storage.type=s3

druid.storage.bucket=<S3 bucket>

druid.storage.baseKey=druid/segments

druid.s3.accessKey=<access key>

druid.s3.secretKey=<secret key>


But when I have the above the my S3 file data gets ingested properly but I get the below exception when I try reading data from 
local file system.

java.lang.Exception: java.io.IOException: No FileSystem for scheme: s3n
	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) ~[hadoop-mapreduce-client-common-2.7.3.jar:?]
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) [hadoop-mapreduce-client-common-2.7.3.jar:?]
Caused by: java.io.IOException: No FileSystem for scheme: s3n
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660) ~[hadoop-common-2.7.3.jar:?]
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) ~[hadoop-common-2.7.3.jar:?] 


When I comment the above S3 section in both the common.runtime.properties file the ingestion for local file works as expected. 
Is this how it is supposed to be? Is there a way to have both work without making any changes to the 
common.runtime.properties file?

My S3 file ingestion config file looks like -

"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "static-s3",
"uris" : [
"s3://<bucket>/<file>",
"s3://
<bucket>/<file>",
"s3://<bucket>/<file>"

],
"prefixes" : []
},
"appendToExisting" : true
}

My local file ingestion config file looks like this -

"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "multi",
"children": [
{
"type" : "static",
"paths" : "<file>"
},
{
"type" : "static",
"paths" : "<file>"
},
{
"type" : "static",
"paths" : "<file>"
}
]
}
}

Can someone please let me know what I am doing wrong?

Thank you

Jihoon Son

unread,
May 3, 2018, 7:46:42 PM5/3/18
to druid...@googlegroups.com
Hi,

this error might occur if you're using Hadoop index task and proper configurations are not set in the jobProperties of your task spec. If you want to load data from your local file system, you can simply use the index task (http://druid.io/docs/latest/ingestion/tasks.html#index-task) as you did for s3. One thing different is that you should use localFirehose (http://druid.io/docs/latest/ingestion/firehose.html#localfirehose) instead of StaticS3Firehose.

Best,
Jihoon

2018년 5월 3일 (목) 오후 2:27, Sameer Paradkar <learn.sa...@gmail.com>님이 작성:
--
You received this message because you are subscribed to the Google Groups "Druid User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to druid-user+...@googlegroups.com.
To post to this group, send email to druid...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/druid-user/7ad6d345-4790-4af4-ba8e-4bd6697ae76c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sameer Paradkar

unread,
May 3, 2018, 8:41:32 PM5/3/18
to Druid User
Thanks a lot for the reply. 

But my problem is I am trying to do delta ingestion (I want the data in those three files to be appended and not overwritten in datastore - since they all might have similar timestamps). So I need to have type "multi". When I try changing inputSpec to firehose I got below error. 

curl -X 'POST' -H 'Content-Type:application/json' -d @load_files.json localhost:8090/druid/indexer/v1/task
{"error":"Could not resolve type id 'multi' into a subtype of [simple type, class io.druid.data.input.FirehoseFactory]\n at [Source: HttpInputOverHTTP@6e3ef583[c=4612,q=1,[0]=EOF,s=STREAM]; line: 1, column: 124]"}

When I try changing type "multi" to "local" - curl command runs fine but throws exception in log while ingesting -

2018-05-04T00:39:16,002 ERROR [task-runner-0-priority-0] io.druid.indexing.overlord.ThreadPoolTaskRunner - Exception while running task[IndexTask{id=index_04132018_S3_2018-05-04T00:39:10.878Z, type=index, dataSource=04132018_S3}]
java.lang.NullPointerException
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:213) ~[guava-16.0.1.jar:?]
	at io.druid.segment.realtime.firehose.LocalFirehoseFactory.initObjects(LocalFirehoseFactory.java:83) ~[druid-server-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.AbstractTextFilesFirehoseFactory.connect(AbstractTextFilesFirehoseFactory.java:57) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.AbstractTextFilesFirehoseFactory.connect(AbstractTextFilesFirehoseFactory.java:46) ~[druid-api-0.12.0.jar:0.12.0

How should the config that reads local file(s) should look like if I have to do delta ingestion? I would really appreciate your help.

Thank you.

Jihoon Son

unread,
May 3, 2018, 8:48:50 PM5/3/18
to druid...@googlegroups.com
The 'index' task also supports appending. Please check the appendToExisting option.

The below NPE is because of the missing 'baseDir' configuration.
You might want an ioConfig like below:

    "ioConfig" : {
      "type" : "index",
      "firehose" : {
        "type" : "local",
        "baseDir" : "/path/to/your/data/dir",
        "filter" : "regex-to-filter-data"
      },
      "appendToExisting": true
    }

Jihoon

2018년 5월 3일 (목) 오후 5:41, Sameer Paradkar <learn.sa...@gmail.com>님이 작성:

Sameer Paradkar

unread,
May 3, 2018, 9:39:24 PM5/3/18
to Druid User
Thanks a lot Jihoon.
I changed my config like the you said and I see the data getting properly loaded without any errors and also getting appended instead of getting overwritten.

Thanks a lot again.

Jihoon Son

unread,
May 4, 2018, 12:25:58 PM5/4/18
to druid...@googlegroups.com
Glad to hear your issue has been resolved!

Jihoon

2018년 5월 3일 (목) 오후 6:39, Sameer Paradkar <learn.sa...@gmail.com>님이 작성:

Sameer Paradkar

unread,
May 19, 2018, 1:19:21 AM5/19/18
to Druid User
Hi Jihoon - I have one more question - Can I use local firehose like you suggested for avro files? 

I mean is the below config valid? It is very important for me to use appendToExisting : true so I was using firehose instead of inputSpec with type :static and inputFormat and path. By doing this I am getting below exception. Can you please suggest what I am doing wrong?

"ioConfig" : {
"type" : "index",
"firehose" : {
"type": "local",
        "inputFormat": "io.druid.data.input.avro.AvroValueInputFormat",
"baseDir" : "<directory>",
"filter" : "<someFile>.avro"
},
"appendToExisting" : true
}
......
......
"tuningConfig" : {
"type" : "index",
"partitionsSpec" : {
"type" : "hashed",
"targetPartitionSize" : 5000000
},
"jobProperties" : {
"avro.schema.input.value.path" : "<path>/<sameSchemaFile_Used_To_Convert_Parquet_To_Avro>.avsc"
}
}

The exception I am getting is -

java.lang.UnsupportedOperationException: makeParser not supported
	at io.druid.data.input.avro.AvroParseSpec.makeParser(AvroParseSpec.java:64) ~[?:?]
	at io.druid.data.input.impl.StringInputRowParser.initializeParser(StringInputRowParser.java:135) ~[druid-api-0.12.0.jar:0.12.0]
        at io.druid.data.input.impl.StringInputRowParser.startFileFromBeginning(StringInputRowParser.java:141) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.FileIteratingFirehose.getNextLineIterator(FileIteratingFirehose.java:91) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.data.input.impl.FileIteratingFirehose.hasMore(FileIteratingFirehose.java:67) ~[druid-api-0.12.0.jar:0.12.0]
	at io.druid.indexing.common.task.IndexTask.generateAndPublishSegments(IndexTask.java:660) ~[druid-indexing-service-0.12.0.jar:0.12.0]
Reply all
Reply to author
Forward
0 new messages