Need help getting events from kinesis to s3 to redshift

74 views
Skip to first unread message

Brandon Clark

unread,
Apr 6, 2016, 3:36:19 PM4/6/16
to Snowplow
I have snowplow configured using the following modules: scala-collector -> scala-kinesis-enrich -> scala-kinesis-s3-sink

My goal is to get the events into redshift. I understand that I need to use storage loader to make this happen but before I can use that the documentation says the events must be "shredded". Ok cool. The documentation says to use EmrETLRunner to do the shredding but it's a bit light on how to do that without going through the enrichment process. I'm already enriching the events with the kinesis module so I don't need Emr to do that, I only need the events to be shredded and moved into another folder for use by the storage-loader. I think I have everything configured to just that using --skip to skip the steps that I don't need. I've watched my Emr cluster spin up and attempt to do the jobs but the Elasticity S3DistCp Step: Enriched S3 -> HDFS step always fails with the following error:

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://<ipaddress>:8020/tmp/8cc67ab2-46c5-43de-9d94-0e769e5f5b7a/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

I can see the InvalidInputException which leads me to believe its an issue with my s3 paths. I feel that the fault is mine in not understanding the implementation details of the individual steps so I could use some hand holding there.

I am running Emr with the following command: snowplow-emr-etl-runner -d --config /etc/snowplow/config.yml -r /etc/snowplow/iglu_resolver.json --skip staging,enrich,archive_raw

Are my flags correct for what i'm trying to accomplish?

FYI The kinesis-s3-sink is depositing my enriched events into s3://<bucket>/events/enriched

Here is my config.yml (with sensitive data omitted):
aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: XXXXXXXXXXXXXXXXXxx
  secret_access_key: XXXXXXXXXXXXXXXX
  s3:
    region: us-east-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://<bucket>/log
      raw:
        in:                  # Multiple in buckets are permitted
          - s3://<bucket>/events/raw          # e.g. s3://my-in-bucket
processing: s3://<bucket>/events/processing
archive: s3://<bucket>/events/archived/raw # e.g. s3://my-archive-bucket/in
enriched: good: s3://<bucket>/events/enriched # e.g. s3://my-out-bucket/enriched/good
bad: s3://<bucket>/events/bad/enriched # e.g. s3://my-out-bucket/enriched/bad
errors: # Leave blank unless continue_on_unexpected_error set to true below archive: s3://<bucket>/events/archived/enriched # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded: good: s3://<bucket>/events/shredded # e.g. s3://my-out-bucket/shredded/good
bad: s3://<bucket>/events/bad/shredded # e.g. s3://my-out-bucket/shredded/bad
errors: # Leave blank unless continue_on_unexpected_error set to true below archive: s3://<bucket>/events/archived/shredded # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr: ami_version: 4.3.0 # Don't change this region: us-east-1 # Always set this jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles placement: us-east-1b # Set this if not running in VPC. Leave blank otherwise ec2_subnet_id: subnet-e66207cd # Set this if running in VPC. Leave blank otherwise ec2_key_name: snowplow bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise software: hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise. lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise. # Adjust your Hadoop cluster below jobflow: master_instance_type: m1.medium core_instance_count: 2 core_instance_type: m1.medium task_instance_count: 0 # Increase to use spot instances task_instance_type: m1.medium task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures additional_info: # Optional JSON string for selecting additional features collectors: format: thrift # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs enrich: job_name: Snowplow ETL # Give your job a name versions: hadoop_enrich: 1.6.0 # Version of the Hadoop Enrichment process hadoop_shred: 0.8.0 # Version of the Hadoop Shredding process hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process continue_on_unexpected_error: false # Set to 'true' (and set out_errors: above) if you don't want any exceptions thrown from ETL output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP storage: download: folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift targets: - name: "Snowplow Redshift" type: redshift host: <redshifthost> # The endpoint as shown in the Redshift console database: snowplow # Name of database port: 5439 # Default Redshift port table: atomic.events username: <username> password: <password> maxerror: 1 # Stop loading on first error, or increase to permit more load errors comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified ssl_mode: disable monitoring: tags: {} # Name-value pairs describing this job logging: level: DEBUG # You can optionally switch to INFO for production snowplow: method: get app_id: dt # e.g. snowplow collector: <collectorhost> # e.g. d3rkrsqld9gmqf.cloudfront.net



Many thanks in advance

Ihor Tomilenko

unread,
Apr 6, 2016, 3:53:57 PM4/6/16
to Snowplow
Hi Brandon,

We no longer actively support this group. Instead, we ask to post any new topics/discussion to our newly created forum here: http://discourse.snowplowanalytics.com/

Could you, please, re-post this topic on Discourse and we will be happy to assist you.

Regards,
Ihor

Brandon Clark

unread,
Apr 6, 2016, 4:08:51 PM4/6/16
to Snowplow
Yes I just noticed that. I have reposted on discourse. Thanks.
Reply all
Reply to author
Forward
0 new messages