snowplow-emr-etl-runner shredding

219 views
Skip to first unread message

Daniel Roythorne

unread,
Sep 17, 2015, 7:08:19 AM9/17/15
to Snowplow
Hi,

I've been setting up the Snowplow pipeline to collect telemetry data for this and have run in to difficulties with the shredding part of the ETL runner. Any pointers would be really appreciated.

Everything runs smoothly via the Python Tracker, Scala Kinesis Collector, snowplow-kinesis-s3 sink, up to the Elasticity Scalding Step: Shred Enriched Events step of the EMR job.

Everything sinks to 'bad', and each record looks like

{"line":"cooltura-0.0.1-001\tmob\t2015-09-16 15:42:44.671\t2015-09-16 15:22:11.229\t1970-01-17 16:40:16.455\tstruct\td29e4daf-66ef-4939-8326-03d21d0bab2d\t\tdefault\tpy-0.7.2\tssc-0.5.0-kinesis\thadoop-1.0.0-common-0.14.0\t28\t84.45.69.x\t\t\t\t65590e33-2e3f-4a6c-8604-9a85747120d1\tGB\t\t\t\t51.5\t-0.13000488\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t3\t2\tlala\t72x35\t1\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tpython-requests/2.2.1 CPython/2.7.6 Linux/3.13.0-53-generic\tUnknown\tUnknown\t\tunknown\tOTHER\ten\t\t\t\t\t\t\t\t\t\t\t\t72\t35\tLinux\tLinux\tOther\tEurope/London\tComputer\t0\t\t\t\t\t\t\t\t\t\t\t\tUSD\tEurope/London\t\t\t\t\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0\",\"data\":{\"useragentFamily\":\"Python Requests\",\"useragentMajor\":\"2\",\"useragentMinor\":\"2\",\"useragentPatch\":null,\"useragentVersion\":\"Python Requests 2.2\",\"osFamily\":\"Linux\",\"osMajor\":\"3\",\"osMinor\":\"13\",\"osPatch\":null,\"osPatchMinor\":null,\"osVersion\":\"Linux 3.13\",\"deviceFamily\":\"Other\"}}]}\t\t","errors":[{"level":"error","message":"Could not find schema with key iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1 in any repository, tried:","repositories":["Iglu Client Embedded [embedded]","Tagcloud Analytics [HTTP]","Iglu Central [HTTP]"]}]}

After first skipping the shred and archive steps to successfully enrich, I call the shredding process with 

bundle exec bin/snowplow-emr-etl-runner -S s3://tagcloud-analytics-etl-out/good/run=2015-09-16-15-42-44 --config config/config.yml --resolver config/resolver.json --enrichments ../config/enrichments

with

config.yml:

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: XXXXXXXXXXXXXXXXXXXXX
  secret_access_key: XXXXXXXXXXXXXXXXXXXX
  s3:
    region: eu-west-1
    buckets:
      assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3://tagcloud-analytics-etl-log
      raw:
        in:                  # Multiple in buckets are permitted
          - s3://tagcloud-analytics-etl-in          # e.g. s3://my-archive-bucket/raw
        processing: s3://tagcloud-analytics-etl-processing
        archive: s3://tagcloud-analytics-etl-archive
      enriched:
        good: s3://tagcloud-analytics-etl-out/good # e.g. s3://my-out-bucket/en
        bad: s3://tagcloud-analytics-etl-out/bad # e.g. s3://my-out-bucket/enriched/bad
        errors: s3://tagcloud-analytics-etl-out/error     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://tagcloud-analytics-etl-out/archive ## Where to archive enriched events to, e.g. s3://my-out-bucket/enriched/archive
      shredded:
        good: s3://tagcloud-analytics-etl-shredded/good # e.g. s3://my-shredded-bucket/en
        bad: s3://tagcloud-analytics-etl-shredded/bad # e.g. s3://my-shredded-bucket/enriched/bad
        errors: s3://tagcloud-analytics-etl-shredded/error     # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://tagcloud-analytics-etl-shredded/archive ## Where to archive enriched events to, e.g. s3://my-shredded-bucket/enriched/archive
  emr:
    ami_version: 3.9.0      # Don't change this
    region: eu-west-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement: # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-79f7271c # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: tagcloud-emr
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase: "0.94.18"                # To launch on cluster, provide version, "0.92.0", keep quotes
      lingual: "1.1"             # To launch on cluster, provide version, "1.1", keep quotes
    # Adjust your Hadoop cluster below
    jobflow:
      master_instance_type: m1.medium
      core_instance_count: 2
      core_instance_type: m1.medium
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: m1.medium
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
collectors:
  format: thrift # Or 'clj-tomcat' for the Clojure Collector, or 'thrift' for Thrift records, or 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs
enrich:
  job_name: Snowplow ETL # Give your job a name
  versions:
    hadoop_enrich: 1.0.0 # Version of the Hadoop Enrichment process
    hadoop_shred: 0.4.0 # Version of the Hadoop Shredding process
  continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  download:
    folder: # Postgres-only config option. Where to store the downloaded files. Leave blank for Redshift
  targets:
    - name: "tagcloud-snowplow-storage"
      type: redshift
      database: snowplow # Name of database
      port: 5439 # Default Redshift port
      table: atomic.events
      username: storageloader
      password: Kg1CYvW5Rp
      maxerror: 1 # Stop loading on first error, or increase to permit more load errors
      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production


resolver.json:

{
        "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0",
        "data": {
                "cacheSize": 500,
                "repositories": [
                        {
                                "name": "Iglu Central",
                                "priority": 0,
                                "vendorPrefixes": ["com.snowplowanalytics"],
                                "connection": {
                                        "http": {
                                                "uri": "http://iglucentral.com"
                                        }
                                }
                        },
                        {
                                "name": "Tagcloud Analytics",
                                "priority": 5,
                                "vendorPrefixes": ["com.snowplowanalytics"],
                                "connection": {
                                        "http": {
                                                "uri": "http://iglu.analytics.tagcloud.tetrahedra.eu"
                                        }
                                }
                        }
                ]
        }
}

I can obtain 'com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1' from both


and 


using wget on the core instance, so the subnets and security groups don't look to be the issue.

Everything in the pipeline is from commit 64a789273047d33c186cf00e2a0d8aabe52339f3 (feature/r70 merge) and deployed on AWS Ubuntu 14.04 instances.

Thanks,
Dan




Alex Dean

unread,
Sep 17, 2015, 8:01:24 AM9/17/15
to Snowplow
Hey Daniel,

Everything looks okay - except it looks like you have duplicated the contents of Iglu Central into your own repo, which is not standard practice. In theory it shouldn't make a difference, but it's the first thing I would revert. Can you remove that duplication (either delete the contents of Iglu Central from Tagcloud repo, or remove the Tagcloud repo reference) and try again. Let us know if that fixes the issue.

Thanks,

Alex

--
You received this message because you are subscribed to the Google Groups "Snowplow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to snowplow-use...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Co-founder
Snowplow Analytics
The Roma Building, 32-38 Scrutton Street, London EC2A 4RQ, United Kingdom
+44 (0)203 589 6116
+44 7881 622 925
@alexcrdean

Daniel Roythorne

unread,
Sep 17, 2015, 9:52:47 AM9/17/15
to Snowplow
Hi Alex,

Thank you for getting back to me so quickly. Removed the Tagcloud iglu endpoint but I get the same error 

{"level":"error","message":"Could not find schema with key iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1 in any repository, tried:","repositories":["Iglu Client Embedded [embedded]","Iglu Central [HTTP]"]}

Daniel Roythorne

unread,
Sep 17, 2015, 9:56:51 AM9/17/15
to Snowplow
I should add, it's the Elasticity S3DistCp Step: Shredded HDFS -> S3 step that fails with

Exception in thread "main" java.lang.RuntimeException: Failed to get source file system
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:739)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:720)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs:/local/snowplow/shredded-events
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1128)
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:736)
	... 9 more

... which I assume is because everything has been written to the bad sink in the previous step. Might be worth not failing the entire job in this case though?

Fred Blundun

unread,
Sep 17, 2015, 11:14:50 AM9/17/15
to snowpl...@googlegroups.com
Hi Daniel,

You're right - S3DistCp fails if it has no input. In release 73 we are planning to have the shred job copy the enriched events from the enriched bucket to the shredded bucket. This should clear up this problem since there will then always be something to copy.

Could you try rerunning using ami_version: 3.6.0 in your config.yml? That's what worked in this thread: https://groups.google.com/forum/#!topic/snowplow-user/cV_7pd178o8

Regards,
Fred

--

Alex Dean

unread,
Sep 17, 2015, 12:00:44 PM9/17/15
to Snowplow
Ah yes - you edited this line in the config:

ami_version: 3.6.0 # Don't change this

and changed it to 3.9.0. It's worth following the guidance in that config file to prevent problems...

A

Gmail (d)

unread,
Sep 17, 2015, 1:17:29 PM9/17/15
to snowpl...@googlegroups.com
Hi,

That's done it. I originally upped the AMI and Hbase versions to fix a problem with Hbase not starting up in an earlier release. 

Thanks for the help, and I'm slightly embarrassed it turned out to be so simple.

Cheers,
Dan


You received this message because you are subscribed to a topic in the Google Groups "Snowplow" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/snowplow-user/BdfpXlbbF6k/unsubscribe.
To unsubscribe from this group and all its topics, send an email to snowplow-use...@googlegroups.com.

Alex Dean

unread,
Sep 17, 2015, 1:19:08 PM9/17/15
to snowpl...@googlegroups.com

Great - glad that fixed it!

A

Reply all
Reply to author
Forward
0 new messages