Logging bad rows in ElasticSearch

94 views
Skip to first unread message

Joao Correia

unread,
Apr 4, 2016, 2:44:20 PM4/4/16
to Snowplow
Hi Snowplowers,

I'm new to ElasticSearch, I followed the steps from the tutorial below, but the event's aren't being sent to ES. I created the index named snowplow.

I tried running with sources blank I don't see any data being loaded. This is my configuration file.

  targets:

    - name: "Joao Redshift Database"

      type: redshift

      host: xxxxxxxxxxxx.redshift.amazonaws.com  # The endpoint as shown in the Redshift console

      database: xxxxxxxxxx # Name of database

      port: 5439 # Default Redshift port

      ssl_mode: disable # One of disable (default), require, verify-ca or verify-full

      table: atomic.events

      username: xxxxxxx

      password: xxxxxx

      maxerror: 1 # Stop loading on first error, or increase to permit more load errors

      comprows: 200000 # Default for a 1 XL node cluster. Not used unless --include compupdate specified

    - name: "Elastic Search"

      type: elasticsearch

      host: "xxxxxxxxx.us-west-2.es.amazonaws.com"

      database: snowplow       # The Elasticsearch index

      port: 80                 # Port used to connect to Elasticsearch

      table: bad_rows          # The Elasticsearch type

      es_nodes_wan_only: true  # Set to true if using Amazon Elasticsearch Service

      # Leave blank or specify: ["s3://out/enriched/bad/run=xxx", "s3://out/shred/bad/run=yyy"]

      sources: 



I even specified a S3 bucket with bad rows and --skip staging,s3distcp,enrich,shred,archive_raw but I get error:

nowplow::EmrEtlRunner::EmrExecutionError (EMR jobflow j-3U6D4HV9GSA0V failed, check Amazon EMR console and Hadoop logs for details (help: https://github.com/snowplow/snowplow/wiki/Troubleshooting-jobs-on-Elastic-MapReduce). Data files not archived.

Snowplow ETL: TERMINATING [STEP_FAILURE] ~ elapsed time n/a [2016-04-04 11:28:57 -0700 - ]

 - 1. Elasticity Scalding Step: Errors in s3://xxxxxxx/snowplow-enriched/bad/run=2016-04-04-10-01-32 -> Elasticsearch: Elastic Search: FAILED ~ 00:03:48 [2016-04-04 11:29:03 -0700 - 2016-04-04 11:32:52 -0700]):

    /snowplow/snowplow-emr-etl-runner!/emr-etl-runner/lib/snowplow-emr-etl-runner/emr_job.rb:471:in `run'

    /snowplow/snowplow-emr-etl-runner!/gems/contracts-0.7/lib/contracts/method_reference.rb:46:in `send_to'

    /snowplow/snowplow-emr-etl-runner!/gems/contracts-0.7/lib/contracts.rb:305:in `call_with'

    /snowplow/snowplow-emr-etl-runner!/gems/contracts-0.7/lib/contracts/decorators.rb:159:in `common_method_added'

    /snowplow/snowplow-emr-etl-runner!/emr-etl-runner/lib/snowplow-emr-etl-runner/runner.rb:68:in `run'

    /snowplow/snowplow-emr-etl-runner!/gems/contracts-0.7/lib/contracts/method_reference.rb:46:in `send_to'

    /snowplow/snowplow-emr-etl-runner!/gems/contracts-0.7/lib/contracts.rb:305:in `call_with'

    /snowplow/snowplow-emr-etl-runner!/gems/contracts-0.7/lib/contracts/decorators.rb:159:in `common_method_added'

    file:/snowplow/snowplow-emr-etl-runner!/emr-etl-runner/bin/snowplow-emr-etl-runner:39:in `(root)'

    org/jruby/RubyKernel.java:1091:in `load'

    file:/snowplow/snowplow-emr-etl-runner!/META-INF/main.rb:1:in `(root)'

    org/jruby/RubyKernel.java:1072:in `require'

    file:/snowplow/snowplow-emr-etl-runner!/META-INF/main.rb:1:in `(root)'

    /tmp/jruby7648259994710175487extract/jruby-stdlib-1.7.20.1.jar!/META-INF/jruby.home/lib/ruby/shared/rubygems/core_ext/kernel_require.rb:1:in `(root)'




stderr

Exception in thread "main" cascading.flow.FlowException: step failed: (1/1) snowplow/bad_rows, with job id: job_1459794457549_0001, please see cluster logs for failure messages
	at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:221)
	at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
	at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:262)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

syslog:

2016-04-04 18:29:22,676 INFO com.amazon.ws.emr.hadoop.fs.EmrFileSystem (main): Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
2016-04-04 18:29:23,085 INFO amazon.emr.metrics.MetricsSaver (main): MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1459794467121 
2016-04-04 18:29:23,086 INFO amazon.emr.metrics.MetricsSaver (main): Created MetricsSaver j-3U6D4HV9GSA0V:i-d398d50b:RunJar:05768 period:60 /mnt/var/em/raw/i-d398d50b_20160404_RunJar_05768_raw.bin
2016-04-04 18:29:24,964 INFO cascading.flow.hadoop.util.HadoopUtil (main): resolving application jar from found main method on: com.snowplowanalytics.snowplow.storage.hadoop.JobRunner$
2016-04-04 18:29:24,967 INFO cascading.flow.hadoop.planner.HadoopPlanner (main): using application jar: /mnt/var/lib/hadoop/steps/s-3AN2Z0DAJL6OV/hadoop-elasticsearch-sink-0.1.0.jar
2016-04-04 18:29:24,985 INFO cascading.property.AppProps (main): using app.id: 0BCE7CB17B3447E4A1728D0ED5DB5915
2016-04-04 18:29:25,974 INFO org.elasticsearch.hadoop.cascading.EsTap (main): Elasticsearch Hadoop v2.2.0.BUILD-SNAPSHOT [a51c2f7f94] initialized
2016-04-04 18:29:26,078 INFO org.apache.hadoop.conf.Configuration.deprecation (main): mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
2016-04-04 18:29:26,147 INFO org.apache.hadoop.conf.Configuration.deprecation (main): mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
2016-04-04 18:29:26,318 INFO cascading.util.Version (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): Concurrent, Inc - Cascading 2.6.0
2016-04-04 18:29:26,324 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....] starting
2016-04-04 18:29:26,325 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....]  source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["s3://xxxxxxxx/snowplow-enriched/bad/run=2016-04-04-10-01-32"]
2016-04-04 18:29:26,325 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....]  sink: EsHadoopTap["EsHadoopScheme[[UNKNOWN]->['output']]"]["snowplow/bad_rows"]
2016-04-04 18:29:26,326 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....]  parallel execution is enabled: true
2016-04-04 18:29:26,326 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....]  starting jobs: 1
2016-04-04 18:29:26,326 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....]  allocating threads: 1
2016-04-04 18:29:26,328 INFO cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] starting step: (1/1) snowplow/bad_rows
2016-04-04 18:29:26,458 INFO org.apache.hadoop.yarn.client.RMProxy (pool-4-thread-1): Connecting to ResourceManager at ip-172-31-25-157.us-west-2.compute.internal/172.31.25.157:8032
2016-04-04 18:29:26,897 INFO org.apache.hadoop.yarn.client.RMProxy (pool-4-thread-1): Connecting to ResourceManager at ip-172-31-25-157.us-west-2.compute.internal/172.31.25.157:8032
2016-04-04 18:29:26,912 WARN org.elasticsearch.hadoop.mr.EsOutputFormat (pool-4-thread-1): Speculative execution enabled for reducer - consider disabling it to prevent data corruption
2016-04-04 18:29:28,015 INFO amazon.emr.metrics.MetricsSaver (DataStreamer for file /tmp/hadoop-yarn/staging/hadoop/.staging/job_1459794457549_0001/job.jar block BP-1545714932-172.31.25.157-1459794425670:blk_1073741829_1005): 1 aggregated HDFSWriteDelay 684 raw values into 1 aggregated values, total 1
2016-04-04 18:29:28,440 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (pool-4-thread-1): Loaded native gpl library
2016-04-04 18:29:28,447 INFO com.hadoop.compression.lzo.LzoCodec (pool-4-thread-1): Successfully loaded & initialized native-lzo library [hadoop-lzo rev 02f444f0932ea7710dcc4bcdc1aa7ca55adf48c9]
2016-04-04 18:29:28,506 INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem (pool-4-thread-1): listStatus s3://xxxxxxx/snowplow-enriched/bad/run=2016-04-04-10-01-32 with recursive false
2016-04-04 18:29:28,763 INFO org.apache.hadoop.mapred.FileInputFormat (pool-4-thread-1): Total input paths to process : 12
2016-04-04 18:29:30,773 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-4-thread-1): number of splits:16
2016-04-04 18:29:31,189 INFO org.apache.hadoop.mapreduce.JobSubmitter (pool-4-thread-1): Submitting tokens for job: job_1459794457549_0001
2016-04-04 18:29:31,759 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (pool-4-thread-1): Submitted application application_1459794457549_0001
2016-04-04 18:29:31,915 INFO org.apache.hadoop.mapreduce.Job (pool-4-thread-1): The url to track the job: http://ip-172-31-25-157.us-west-2.compute.internal:20888/proxy/application_1459794457549_0001/
2016-04-04 18:29:31,917 INFO cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] submitted hadoop job: job_1459794457549_0001
2016-04-04 18:29:31,917 INFO cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] tracking url: http://ip-172-31-25-157.us-west-2.compute.internal:20888/proxy/application_1459794457549_0001/
2016-04-04 18:29:56,565 INFO cascading.util.Update (UpdateRequestTimer): newer Cascading release available: 2.6.3
2016-04-04 18:32:42,584 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] hadoop job job_1459794457549_0001 state at FAILED
2016-04-04 18:32:42,587 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] failure info: Task failed task_1459794457549_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

2016-04-04 18:32:42,630 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] task completion events identify failed tasks
2016-04-04 18:32:42,630 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] task completion events count: 10
2016-04-04 18:32:42,632 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000000_0, Status : FAILED
2016-04-04 18:32:42,632 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000002_0, Status : FAILED
2016-04-04 18:32:42,632 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000004_0, Status : FAILED
2016-04-04 18:32:42,632 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000006_0, Status : FAILED
2016-04-04 18:32:42,632 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000007_0, Status : FAILED
2016-04-04 18:32:42,632 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000001_0, Status : FAILED
2016-04-04 18:32:42,633 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000005_0, Status : FAILED
2016-04-04 18:32:42,633 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000003_0, Status : FAILED
2016-04-04 18:32:42,633 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000008_0, Status : FAILED
2016-04-04 18:32:42,633 WARN cascading.flow.FlowStep (pool-4-thread-1): [com.snowplowanalytics....] event = Task Id : attempt_1459794457549_0001_m_000009_0, Status : FAILED
2016-04-04 18:32:42,638 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....] stopping all jobs
2016-04-04 18:32:42,639 INFO cascading.flow.FlowStep (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....] stopping: (1/1) snowplow/bad_rows
2016-04-04 18:32:42,642 INFO cascading.flow.Flow (flow com.snowplowanalytics.snowplow.storage.hadoop.ElasticsearchJob): [com.snowplowanalytics....] stopped all jobs

Alex Dean

unread,
Apr 4, 2016, 8:18:09 PM4/4/16
to Snowplow
Hi Joao,

It looks like you are using Amazon Elasticsearch Service - how have you configured the access to that ES cluster?

Thanks,

Alex

--
You received this message because you are subscribed to the Google Groups "Snowplow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to snowplow-use...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Co-founder
Snowplow Analytics
The Roma Building, 32-38 Scrutton Street, London EC2A 4RQ, United Kingdom
+44 (0)203 589 6116
+44 7881 622 925
@alexcrdean

Joao Correia

unread,
Apr 5, 2016, 1:05:33 PM4/5/16
to Snowplow
Hi Alex,

Your question made me realize what the problem was. I created the cluster with access to the IP address where I'm running the EMR-ETL process, but not the EMR machines since I don't know their IP's. 

I then reconfigured ES to be completely open and it worked. Maybe we could explain in the documentation the reason of being completely open. 

How could Elastic Search access be secured?


Thanks
Joao

jo...@snowplowanalytics.com

unread,
Apr 6, 2016, 4:33:59 AM4/6/16
to Snowplow
Hi Joao,

The issue with the Amazon Elasticsearch Service at the moment is that you cannot place it within your own VPC.  As such if you run EMR in a public subnet you need to dynamically whitelist all of the IPs being used by EMR, or to point EMR at a proxy that is already whitelisted.  You cannot know the CIDR ranges ahead of time or easily control access until AWS allow this.

---
Option 1

The simplest and safest solution to this is to start running EMR inside a private subnet.  This means that all traffic egress from EMR will be routed through a NAT instance(s) - meaning that you will only need to whitelist the NAT IP address(es) in AWS Elsticsearch Service.  For an explanation on private subnets and setup: http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/VPC_Scenario2.html

To get this working you will need to:

1. Setup a VPC with a public and private subnet
2. Update your config to use the ID of your new private subnet
3. Re-create the default EMR IAM roles using: `aws emr create-default-roles` - new permissions are required
  - You might need to delete the current roles in IAM
4. Update your versions to the following:
  - ami_version: "4.3.0"
  - hadoop_enrich: "1.6.0"
  - hadoop_shred: "0.8.0"
  - hadoop_elasticsearch: "0.1.0"
5. Ensure your network ACLs for both your Public and Private subnets allow ALL ICMP for both ingress and egress (required for EMR communication between master and workers).

EMR should then run happily inside the Private Subnet.  If you want to test this communication prior to running a job simply start an instance in your new private subnet and attempt to `curl http://< service endpoint >`; you should get a 200 response.

---
Option 2

You can set up another instance to work as a proxy for EMR traffic so you need only whitelist the instance being used as a proxy (essentially what the NAT ends up doing in the previous option).  You can follow this guide: https://eladnava.com/secure-aws-elasticsearch-service-behind-vpc/ for more information on how to configure it all.

---

Hope that helps!

Josh
Reply all
Reply to author
Forward
0 new messages