Retrieving Kafka offsets used for an ingestion job

62 views
Skip to first unread message

Will Bertelsen

unread,
Jul 18, 2016, 4:57:41 PM7/18/16
to gobblin-users
Hi all,

I have a kafka -> hdfs ingestion pipeline coming together as described by the docs (using the mapreduce method) but one thing I need to be able to do is record the high watermark kafka offset for each topic/partition processed by a job.
I've managed to hook in a proof-of-concept by overriding `publishMetadata` in my publisher, but this seems a little messy. I know this data is recorded in the FsStateStore but accessing it externally seems non trivial. Any tips / best practices for this?

Will Bertelsen

unread,
Aug 15, 2016, 7:54:43 PM8/15/16
to gobblin-users
If any future searchers are curious I ended up serializing the offsets with each record I emit and rolling them up in a later MR to find the largest for each partition/topic.

Shirshanka Das

unread,
Aug 15, 2016, 10:08:24 PM8/15/16
to Will Bertelsen, gobblin-users
Thanks for reporting your approach Will! 

Using the metadata emitted from Gobblin's publisher seems like a better approach on paper. Could you explain what issues you ran into while trying to do that and why the solution was hacky? 

Also, if you could explain why you want to record the high watermark externally, that might help us understand the context better. e.g. Do you want to affect future runs of gobblin based on this value or merely want to record and observe this for monitoring and troubleshooting reasons. 

Shirshanka



--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/42f29e28-476a-40fb-8064-89914a0c8718%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Will Bertelsen

unread,
Aug 16, 2016, 3:45:22 PM8/16/16
to gobblin-users, wbert...@foursquare.com
So I actually did a full implementation using publishMetadata and it worked fine. But in the end my requirement was "for each topic and each partition, record the highest offset seen for a record timestamped with a particular UTC day".  Using publishMetadata I was able to map a gobblin job id to a set of offsets, but that wasn't sufficient for runs that crossed UTC midnight. 

Stepping back, I need the high watermark offsets for each day of my UTC-day partitioned output so that server code can ingest the gobblin produced data from hdfs and then consume directly from kafka data starting at the last offset seen by the offline jobs.


On Monday, August 15, 2016 at 7:08:24 PM UTC-7, Shirshanka Das wrote:
Thanks for reporting your approach Will! 

Using the metadata emitted from Gobblin's publisher seems like a better approach on paper. Could you explain what issues you ran into while trying to do that and why the solution was hacky? 

Also, if you could explain why you want to record the high watermark externally, that might help us understand the context better. e.g. Do you want to affect future runs of gobblin based on this value or merely want to record and observe this for monitoring and troubleshooting reasons. 

Shirshanka


On Tue, Aug 16, 2016 at 5:24 AM, Will Bertelsen <wbert...@foursquare.com> wrote:
If any future searchers are curious I ended up serializing the offsets with each record I emit and rolling them up in a later MR to find the largest for each partition/topic.

On Monday, July 18, 2016 at 1:57:41 PM UTC-7, Will Bertelsen wrote:
Hi all,

I have a kafka -> hdfs ingestion pipeline coming together as described by the docs (using the mapreduce method) but one thing I need to be able to do is record the high watermark kafka offset for each topic/partition processed by a job.
I've managed to hook in a proof-of-concept by overriding `publishMetadata` in my publisher, but this seems a little messy. I know this data is recorded in the FsStateStore but accessing it externally seems non trivial. Any tips / best practices for this?

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.

To post to this group, send email to gobbli...@googlegroups.com.

Shirshanka Das

unread,
Aug 16, 2016, 10:35:24 PM8/16/16
to Will Bertelsen, gobblin-users
Thanks for the context. I'm assuming that your records are already annotated with this timestamp that you are partitioning by. 

Could you have a case where records with newer timestamps occur earlier in the Kafka stream than records with older timestamps? 

This is pretty common in most activity event data since the timestamp is generally attached by the producer and the commit order is decided at the broker based on when the record finally reaches it. 

If the above is true, you could have a case where you miss data (newer records with older offsets) when you combine the offline copy with the online stream. 


Shirshanka

Will Bertelsen

unread,
Aug 17, 2016, 7:42:22 PM8/17/16
to gobblin-users, wbert...@foursquare.com
Hmm, I thought a little about late / out of order records wrt when to roll up a day, but thats a good point wrt the offsets.
Maybe we can partition our output by something not dependent on information on the records themselves to guarantee that when we report the high watermark offsets we are also presenting all the data for messages below those offsets.

Shirshanka Das

unread,
Aug 18, 2016, 10:08:35 AM8/18/16
to Will Bertelsen, gobblin-users
You could just partition the data using a global sequence number that you increment every time the flow runs. 

Each folder would contain a contiguous run of records across all the partitions. You could drop in a metadata file in the folder to record the per partition offset range contained in the folder. You could also drop in metadata that includes the range of timestamps seen by that run. The former is an exact range while the latter is a hint that you can use for partition pruning for offline jobs that need to access data with range predicates on the timestamp. 


Shirshanka


To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.

To post to this group, send email to gobbli...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages