convert EnrichedEvent bean to TSV

87 views
Skip to first unread message

Carlo Scarioni

unread,
Apr 5, 2016, 11:35:27 AM4/5/16
to Snowplow
Hi, 

I have spent most of today trying to figure out a way to convert an EnrichedEvent object to TSV

What we want to achieve is the following:

We are running Snowplow enrichments through a couple of Spark Streaming jobs. The first application reads from Kinesis, applies the different enrichments we need, and stores the JSON serialized enriched event in another Kinesis output.

The second streaming job, reads the JSON enriched event from Kinesis, deserializes it into a `EnrichedEvent` object, applies the shredding from Snowplow and loads the data into Redshift through custom assembled COPY commands.

All that works great for the `context` tables. However I can't seem to be able do the same with the `atomic.events`

I know that `atomic.events` are copied from S3 to Kinesis as TSV files (instead of JSON), however I can't find a way to convert my EnrichedEvent to TSV so I can upload it to S3. I found the code that does this (I think) on here: https://github.com/snowplow/snowplow/blob/master/3-enrich/scala-hadoop-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich.hadoop/EtlJob.scala#L199 using Scalding. But I can't seem to understand it properly or how to do something similar in my use case. 

Is there a way to achieve this (EnrichedEvent -> TSV)?. Is there a better way to approach the problem?

Also, not sure if it actually would be better to make `atomic.events` be also populated from JSON instead of TSVs ?

Thanks

Carlo

Alex Dean

unread,
Apr 6, 2016, 4:10:57 AM4/6/16
to Snowplow
Hi Carlo,

That architecture sounds like a lot of reinventing the wheel... Is there a reason why you can't instead:
  1. Run our Scala Stream Enrich (a KCL job) on the raw Kinesis stream. That will produce Snowplow enriched events in the regular format. If you need custom enrichments, you can write them in JavaScript using the JavaScript Scripting Enrichment. If you need to write your enrichments in another language, e.g. JRuby or Java - PR welcome!
  2. Add your Spark Streaming job downstream of that that performs the shredding and load into Redshift. (The need for that job will hopefully go away when we release our Redshift dripfeeder for Kinesis, which is in the works)

Remember that Spark Streaming uses the KCL under the hood, so there is no ops simplification from using a Spark Streaming job over Scala Stream Enrich... Generally the closer you stay to the reference Snowplow architecture, the more you will get "for free" as we continue to roll out more and more functionality over the coming months and years.

> Also, not sure if it actually would be better to make `atomic.events` be also populated from JSON instead of TSVs ?

Yes, our plan is to replace the EnrichedEvent format with an Avro schema over time. Our Scala Analytics SDK is the first step in providing an abstraction layer/facade over the enriched event to make these upgrades easier: https://github.com/snowplow/snowplow-scala-analytics-sdk

Cheers,

Alex



--
You received this message because you are subscribed to the Google Groups "Snowplow" group.
To unsubscribe from this group and stop receiving emails from it, send an email to snowplow-use...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Co-founder
Snowplow Analytics
The Roma Building, 32-38 Scrutton Street, London EC2A 4RQ, United Kingdom
+44 (0)203 589 6116
+44 7881 622 925
@alexcrdean

Carlo Scarioni

unread,
Apr 6, 2016, 5:06:54 AM4/6/16
to Snowplow
Hi Alex, thanks a lot for the reply

By the way I am writing from Simply Business, so you might be familiar with our model a bit already, mainly talking with Dani.

- Regarding question number 1. Although I understand what you say regarding "staying closer to the reference Snowplow architecture...", we do have our strategic goals of using Spark for our real time event and analysis pipelines.  This currently gives us a couple of advantages in the way we manage and monitor the applications, and also in the way we define our enrichment pipeline as a standard Spark RDD DAG. So now for example I go to my Spark UI and I can nicely see my 2 long running streaming Jobs running there and their event rate, etc.

- On question number 2. That is what we have. A job that reads from the Kinesis stream, the thing is that in Kinesis the event we are storing it as JSON. From that we deserialize it back into EnrichedEvent. So I wanted to know if there was a way to convert it back to TSV. I currently have "solved" the problem by creating a correspondent `jsonpath` file for `atomic_events` and copying it as JSON to redshift. That seems to be working now, and by your comment on "migrating to Avro", I guess this is really not a bad solution after all. Would you agree?

Expanding a bit on the use of Spark (and just for the realtime as we do still use the full fledged Hadoop based Snowplow process for Batch). We are noticing that we are starting to use Snowplow more as a library than a process, which I think still gets a lot of value from the core offer you have as all the enriching, shredding, canonical events. Do you think this is a bad way of leveraging Snowplow?. Or on the contrary you see this as a potential way to mark the division between library and process for Snowplow to be used from different technologies?.

Thanks a lot again, and looking forward to your opinions.

Carlo

dani...@simplybusiness.co.uk

unread,
Apr 6, 2016, 9:09:02 AM4/6/16
to Snowplow
Hi there,

To provide a little more context, we decided to use Spark because we want to use Snowplow to provide data and intelligence to other systems, not just for analytics. We tried first by using the default KCL implementation and then adding new apps that consumed from the enriched stream, but we found it cumbersome. The main reason is that you need to define all the Kinesis Streams ahead of time and configure them for all our environments (we use integration, staging and prod). It's also difficult to see the whole picture of what the system does when the code it's distributed across several projects. In comparison, it's much more convenient to use Spark's DStreams to first enrich the raw events (using scala-common-enrich) and then pass them to other 'apps'.

Apart from that, we are also exploring using PySpark for data modelling, so it's simpler to have everything running on the same cluster.

Cheers,
Dani

Alex Dean

unread,
Apr 6, 2016, 8:46:06 PM4/6/16
to Snowplow
Hey Dani, Carlo,

Lots of interesting thoughts in there! I'll reply chunk by chunk if that's okay:


> Is there a way to achieve this (EnrichedEvent -> TSV)?



> we do have our strategic goals of using Spark for our real time event and analysis pipelines

Having a Spark/Spark Streaming host for the Snowplow enrichment process, alongside our existing Scalding and KCL options, is definitely an interesting idea. Feel free to raise a PR!


> We are noticing that we are starting to use Snowplow more as a library than a process, which I think still gets a lot of value from the core offer you have as all the enriching, shredding, canonical events.

Really interesting point. When we created Snowplow originally, our plan was to support multiple reference implementations of each component in different languages (so e.g. imagine a Scala Enrich vs a .NET Enrich). In the intervening years a couple of things happened:
  1. We accumulated a ton of valuable validation and enrichment logic in Scala (essentially, Scala Common Enrich), which became a large barrier to porting this business logic into another language
  2. Scala effectively "won" the data engineering war, being natively supported by Hadoop, Spark, Flink, Samza, Storm...

Combine these two and what we found was that by extracting our core validation and enrichment into a core Scala library, we were then able to use this library in multiple runtimes (Scalding, KCL with others coming in the future). So yes I think using Snowplow components in library form has merit in support of porting enrichment to a new runtime like Spark or Flink.

> We tried first by using the default KCL implementation and then adding new apps that consumed from the enriched stream, but we found it cumbersome. The main reason is that you need to define all the Kinesis Streams ahead of time and configure them for all our environments (we use integration, staging and prod). It's also difficult to see the whole picture of what the system does when the code it's distributed across several projects.

This is the whole microservice versus monolith debate :-) It's possible to do either with Snowplow - you can compose pipelines using our specific apps (Stream Enrich, Kinesis S3, Elasticsearch Sink etc), with streams as the glue in-between, or you could take the underlying libraries and compile them into a monolith like a Spark Streaming job which does enrichment and, say, event data modeling. The microservice approach places more challenges on your devops team, the latter is more developer-time-intensive. We are firmly in favour of the microservice approach at Snowplow - we prefer smaller composable units of work ("async microservices) which can be assembled together in flexible ways. With things like Docker, Kubernetes, Mesos, Kafka Streams and AWS Lambda there is a lot of great tooling consolidating around the async microservices approach - the next 12 months should be very interesting!

> I go to my Spark UI and I can nicely see my 2 long running streaming Jobs running there and their event rate, etc

You are right - it's nice to have a UI to monitor your real-time pipeline holistically - but even if you can mandate using Spark for all jobs (even when some would fit Lambda or KCL better), you are still only seeing half the picture, because the behaviour of your Kinesis streams (shard merges/splits etc) isn't represented in that UI. There's no good solution at the moment - we have a new monitoring and autoscaling fabric (including UI) in heavy R&D at the moment for the Snowplow Real-Time Managed Service; I hope we can share more information about this publically later this summer.

Cheers,

Alex


dani...@simplybusiness.co.uk

unread,
Apr 7, 2016, 10:16:12 AM4/7/16
to Snowplow
Thanks for the info Alex, it's really helpful.

We can't share the Spark pipeline yet, because it's been mainly built around our needs, we'll consider it if we find a way to define custom flows. That said, we'll probably work on adding custom JRuby enrichments to scala-common-enrich, as we would like to be using a vanilla version.

Best,
Dani

Alex Dean

unread,
Apr 7, 2016, 8:33:19 PM4/7/16
to Snowplow
Thanks Dani,

A JRuby Scripting Enrichment would be an awesome contribution - let us know if we can help you make that happen!

Cheers,

Alex
Reply all
Reply to author
Forward
0 new messages