kafka changelog topic to file

61 views
Skip to first unread message

Clemens Valiente

unread,
Apr 5, 2017, 12:59:04 PM4/5/17
to gobblin-users
Hi,

I have several kafka topics that are basically CDC changelogs. I would like gobblin to occasionally read that topic and materialize it to a new snapshot of the underlying table.
for that I would need:
- a kafka source that always starts at the beginning of the topic. I had a look and it seems like it is not possible to do it without a complete reimplementation of the KafkaSource since all the methods determining starting offset are private, or am I missing something here?
- deduplication of records before they are written. While the Quality checker seemed like a good candidate I don't think I can use that - the row level qualitychecker only has individual rows and not the overall context, and the task level checker doesn't even do what the documentation says and just looks at the state of the task output.

I was thinking of storing the rows in a rocksDB and store that one in the State.properties but the last part sounds ugly to me. Are there any other ways of doing this?

Shirshanka Das

unread,
Apr 13, 2017, 3:01:34 AM4/13/17
to Clemens Valiente, gobblin-users
Hi Clemens,
   Does ingesting the Kafka topic to HDFS followed by running gobblin compaction (http://gobblin.readthedocs.io/en/latest/user-guide/Compaction/) solve your problem? 
   
  We generally store the CDC on HDFS (e.g. ingested into /data/dbchanges) separate from the compacted snapshots (compaction applied to /data/dbchanges and published to /data/dbsnapshots)

  I'm not sure I follow why you would have to periodically re-consume the entire Kafka topic, versus just continually ingesting it into HDFS and compacting there.

Shirshanka



--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-users+unsubscribe@googlegroups.com.
To post to this group, send email to gobbli...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/gobblin-users/acd72d8e-8046-4f96-a4e4-18a822859c7f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Clemens Valiente

unread,
Apr 15, 2017, 5:13:26 AM4/15/17
to gobblin-users, csieb...@gmail.com
I had a look at both the mapreduce and hive compactor previously but they weren't doing what I wanted.
Since messages have no timestamp (kinda hard to get out of the mysql binlog) they need to be deduplicated in exactly the same order they come out of kafka.
I managed it now by writing a kafka extractor that first pulls all records into a rocksDB deduplicating them along the way before emitting them. It requires reading the whole snapshot, but that can scale with # of partitions and log compation on kafka side keeps the data size close to the original table.


On Thursday, 13 April 2017 09:01:34 UTC+2, Shirshanka Das wrote:
Hi Clemens,
   Does ingesting the Kafka topic to HDFS followed by running gobblin compaction (http://gobblin.readthedocs.io/en/latest/user-guide/Compaction/) solve your problem? 
   
  We generally store the CDC on HDFS (e.g. ingested into /data/dbchanges) separate from the compacted snapshots (compaction applied to /data/dbchanges and published to /data/dbsnapshots)

  I'm not sure I follow why you would have to periodically re-consume the entire Kafka topic, versus just continually ingesting it into HDFS and compacting there.

Shirshanka


On Wed, Apr 5, 2017 at 9:59 AM, Clemens Valiente <csieb...@gmail.com> wrote:
Hi,

I have several kafka topics that are basically CDC changelogs. I would like gobblin to occasionally read that topic and materialize it to a new snapshot of the underlying table.
for that I would need:
- a kafka source that always starts at the beginning of the topic. I had a look and it seems like it is not possible to do it without a complete reimplementation of the KafkaSource since all the methods determining starting offset are private, or am I missing something here?
- deduplication of records before they are written. While the Quality checker seemed like a good candidate I don't think I can use that - the row level qualitychecker only has individual rows and not the overall context, and the task level checker doesn't even do what the documentation says and just looks at the state of the task output.

I was thinking of storing the rows in a rocksDB and store that one in the State.properties but the last part sounds ugly to me. Are there any other ways of doing this?

--
You received this message because you are subscribed to the Google Groups "gobblin-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gobblin-user...@googlegroups.com.

Shirshanka Das

unread,
Apr 15, 2017, 11:25:03 AM4/15/17
to Clemens Valiente, gobblin-users
Interesting... are you planning to send a PR our way? Would definitely like to look at it. 

IIRC MySQL binlog should be able to provide gtxid / binlog offset which can be used for compaction. What process are you using to read from binlog and write to Kafka? 

Shirshanka


Clemens Valiente

unread,
Apr 24, 2017, 4:17:20 AM4/24/17
to gobblin-users, csieb...@gmail.com
The code is not exclusively mine so I have to align before I would be able to share it.

But in the end it was not very complicated. It's a subclass of the kafkaextractor that on the first call to readRecordImpl() reads the entire kafka topic into a rocksdb (doing a super.readrecord until it's null) and creates a rocksiterator that returns the current value and goes to the next for each call of readRecordImpl().
Reply all
Reply to author
Forward
0 new messages