Setting Up Enrich

111 views
Skip to first unread message

Jeetu Choudhary

unread,
Apr 5, 2016, 6:24:40 AM4/5/16
to Snowplow
I have created Scala Stream collector and store Thrift events to a Kinesis stream.  Now i need to setup Enrichment process.
My question is Which enrichment process i choose:

1. EmrEtlRunner or
2. StreamEnrich

Is there any specific rule for that.

Thanks

Ihor Tomilenko

unread,
Apr 5, 2016, 11:29:12 AM4/5/16
to Snowplow
Hi Jeetu,

The main difference between the two is
  1. EmrEtlRunner is used in a batch pipeline. That is you need to schedule periodical run of the enrichment process and then load the data into Redshift DB. That also implies you have to use Kineses S3 Sink to be able to deploy EmrEtlRunner.
  2. StreamEnrich is used to produce "real-time" data. That is events sent to the stream is processed in real-time (as opposed to batch/periodical manner). The enriched data would be sent to Elasticsearch as its storage. That also implies using an additional Kinesis stream (to get the enriched data into) and Kinesis Elasticsearch Sink (to send/sink the data to Elasticsearch).
The choice is yours.

Regards,
Ihor

Jeetu Choudhary

unread,
Apr 5, 2016, 12:13:35 PM4/5/16
to Snowplow
Can you help me setup StreamEnrich process because i don't understand what iglu and some other schema generator works.
My Configuration file for Enrichment is:



# Default Configuration for Scala Kinesis Enrich.

enrich {
  # Sources currently supported are:
  # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
  source = "kinesis"

  # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt
  #    logging.
  sink = "kinesis"

  # AWS credentials
  #
  # If both are set to 'cpf', a properties file on the classpath is used.
  # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/ClasspathPropertiesFileCredentialsProvider.html
  #
  # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  #
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
  aws {
    access-key: "********"
    secret-key: "********"
  }

  streams {
    in: {
      raw: "{{mystream-good}}"

      # After enrichment, are accumulated in a buffer before being sent to Kinesis.
      # The buffer is emptied whenever:
      # - the number of stored records reaches record-limit or
      # - the combined size of the stored records reaches byte-limit or
      # - the time in milliseconds since it was last emptied exceeds time-limit when
      #   a new event enters the buffer
      buffer: {
        byte-limit: 500
        record-limit: 1
        time-limit: 500
      }
    }

    out: {
      enriched: "{{mystream-enriched}}"
      bad: "{{mystreams-bad}}"

      # Minimum and maximum backoff periods
      # - Units: Milliseconds
      backoffPolicy: {
        minBackoff: 10
        maxBackoff: 100
      }
    }

    # "app-name" is used for a DynamoDB table to maintain stream state.
    # You can set it automatically using: "SnowplowKinesisEnrich-$\\{enrich.streams.in.raw\\}"
    app-name: "{{enrichStreamsAppName}}"

    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # Note: This only effects the first run of this application
    # on a stream.
    initial-position = "TRIM_HORIZON"

    region: "{{my-region}}"

Ihor Tomilenko

unread,
Apr 5, 2016, 1:23:19 PM4/5/16
to Snowplow
Jeetu

Here's the standard flow you are after:

Raw Events Stream -> Stream Enrich -> Enriched Events Stream -> Kinesis Elasticsearch Sink -> Elasticsearch

The corresponding info could be found on the following wiki pages:


In short, the easiest way is to
  1. Download executable jarfile. The (zip) archive contains:
    • Scala Stream Collector
    • Stream Enrich
    • Kinesis Elasticsearch Sink
  2. Configure Stream Enrich to have "kinesis" as the source and sink
    • in:raw is the stream your Stream Collector sends the raw events to
    • out:enriched is the stream your enriched good events are sent to
    • out:bad is the stream your enriched bad events are sent to
  3. You need to run Stream Enrich on your own server. It will interact with Amazon Kinesis via the Amazon API
Note, if you want to process both enriched good and enriched bad events you would have to launch two separate instances of Stream Enrich for each of the good and bad streams.

Also the buffer's standard value for production could be something like 
  • byte_limit : "4500000"
  • record_limit  : "500"
  • time_limit : "5000"
The resolver.json is the standard resolver file you would use in the Snowplow pipeline.

If you still have any questions, please, be more specific.

Hopefully that helps.

--Ihor

Ihor Tomilenko

unread,
Apr 5, 2016, 1:30:58 PM4/5/16
to Snowplow
Sorry, a small correction. I was talking about two separate instances of Kinesis Elasticsearch Sink (not Stream Enrich).

Jeetu Choudhary

unread,
Apr 5, 2016, 2:06:18 PM4/5/16
to Snowplow
Thanks for help, I am working on it if anything comes up i will get in touch with you

Jeetu Choudhary

unread,
Apr 6, 2016, 7:00:00 AM4/6/16
to Snowplow

We are giving only good Stream in input. What about bad stream data we stored by Stream collector. And For Enriched Events Stream do we have to create another stream.

Ihor Tomilenko

unread,
Apr 6, 2016, 1:05:43 PM4/6/16
to Snowplow
Hi Jeetu,

I'm not quite following you. The Stream Collector (any collector for that matter) has no understanding of bad and good data. Its sole purpose is to gather all the events the trackers send to it.

Being a stream collector, it passes the events to a raw events Kinesis stream. Stream Enrich processes the raw events and separates them into good and bad events and thus sends them into separate streams denoted as out:enriched and out:bad in the config.hocon configuration file.

So, yes, you need three Kinesis streams:
  • Raw Events stream
  • Enriched (good) Events stream
  • Bad Events stream
If you are not interested in analyzing bad events then you would need only one Kinesis Elasticsearch Sink (for good events). The bad events will "die" in one day in the Bad Events stream. Otherwise you would need yet another Kinesis Elasticsearch Sink for bad events stream too.

Also you might need yet another Kinesis stream for the events "rejected" by Kinesis Elasticsearch Sink. Schematically one could represent the flow like this:

Raw Events Stream -> Stream Enrich                         |-----> Elasticsearch
                          |-> Enriched Events Stream -> ES Sink -> Out stream (errors)
                          |-> Bad Events Stream ------> (ES Sink)

Regards,
Ihor

Jeetu Choudhary

unread,
Apr 7, 2016, 8:05:54 AM4/7/16
to Snowplow
yes, i totally agree with you:

stream {
 
    region: "us-east-1"
 
        good: "{NameOfGoodStream}"
 
        bad: "{NameOfBadStream}" 

}

 In configuration file which i passed at run time of scala stream collector. In this i file there are two streams :
1. For good raw events (NameOfGoodStream)
2. for Bad Raw Events (NameOfBadStream)


Thanks
Reply all
Reply to author
Forward
0 new messages