Kafka Connect JDBC Predictable Bulk Extracts

785 views
Skip to first unread message

Micah Whitacre

unread,
Dec 5, 2016, 2:58:09 PM12/5/16
to Confluent Platform
Kafka Connect JDBC bulk is really great for creating a snapshot of the data in a table quickly and then pushing into Kafka.  In general if my use case stayed as unbounded this is not as much of an issue but I have a requirement right now to re-create the snapshots of the tables in HDFS as Avro/CSV files.  The issue I have is how to ensure I know when the bulk extract has run and completed so that I don't recreate the snapshot while data is still being fed to Kafka.  Knowing when a bulk extract runs is slightly difficult to do because it is dependent on when the connector was started right?  If I knew when the bulk extract ran I could run my file generation much later ensuring no overlap.

Based off this problem/use case I was wonder about two things:
* Is there any metric/logging that could be used to determine when a bulk extract of a table completed?
* What are the thoughts on adding a "cron" like scheduling config for the bulk extract such that when it pulls the data is more predictable instead of relative to when the connector was started and polling time?

thanks.


Gwen Shapira

unread,
Dec 6, 2016, 2:36:35 AM12/6/16
to confluent...@googlegroups.com
If I understand you correctly, your plan is to run JDBC source in
"bulk" mode and use the poll.interval.ms config to control how
frequently you bulk load the data? and then have HDFS connector
"drain" the topic before dumping another snapshot?

I have to admit that orchestrating connectors with these requirements
sound a bit tricky...

Few things I'd do:
1. Make sure you have a significant interval between JDBC connector
polls (compared to the time the bulk load takes) to avoid a scenario
where a second bulk load starts before the first finishes.
2. I'd use the producer JMX metrics on the source connector tasks to
check when they stopped producing as an indicator of when the load
into Kafka finished.
3. HDFS Sink by default will just add the additional bulk-loads into
the same HDFS dir and Hive table (named after the topic in use), you
may need to hack a bit to get around that. Depending on the exact
requirements.
4. I may consider using an actual cron-job that starts stand-alone
connectors with the appropriate configuration for every bulk load
(both JDBC and HDFS sides) and then stop the connectors (using REST
APIs) when the bulk-load finishes (see point 1). I may even update the
configuration to write to a different topic every time, to avoid
mix-up of snapshots. Then you want another cron-job to clean up the
old topics...

As i said, a bit tricky.

We do have "batch mode" plans for connect and the "cron" configuration
looks like something that will fit in. We didn't spec the details yet
though. Feel free to give it some thought and perhaps contribute few
improvements to make your use-case easier.

Gwen
> --
> You received this message because you are subscribed to the Google Groups
> "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to confluent-platf...@googlegroups.com.
> To post to this group, send email to confluent...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/6dbcb22b-b0e8-47c7-89bf-7eecc9a6c0ef%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog

Micah Whitacre

unread,
Dec 6, 2016, 7:15:46 PM12/6/16
to Confluent Platform
Thanks for the suggestions.  The last option of starting the connectors on a cron schedule to force the start of the extract time is what we had been looking at.  Set the poll time to be 24+ hours and then have a cron job restart the JDBC connectors at like 1 AM and the hdfs file generation one happen at like 1 PM with the assumption a full batch extract takes less than 24 hours.  

In our case technically we weren't looking at using the HDFS Connector but instead something like an Oozie coordinated MapReduce job or Spark Application.  So it isn't completely scheduling coordinating between connectors.

Is the "batch" concept specific to the JDBC connector or something built into Kafka Connect in which case there'd be a KIP?  If a KIP is there a link?

Micah Whitacre

unread,
Feb 23, 2017, 6:38:40 PM2/23/17
to Confluent Platform
Revisiting this conversation, Gwen is there a discussion/JIRA/KIP for the "batch" mode you mentioned?  Would like to read up on the proposal and then contribute use cases/feedback in the appropriate location.

One of the things we've been trying to talk through is if there is any mechanism with Kafka Connect and source connectors that could be used to mark the "end" of a batch for completeness sake.  I realize this is specific to bounded datasets vs unbounded but in use cases like generating files only when we know the batch is complete it would be helpful.  It however would require exposure to individual records as a batch vs handling them individually.  

A thought I had was using the simple transform functionality of newer versions of Kafka Connect we could stamp a "batch id" onto the data as it flows by however that might require knowledge from the source connector of the start and end of the batch.

Gwen Shapira

unread,
Feb 24, 2017, 7:14:20 PM2/24/17
to confluent...@googlegroups.com
It isn't much of a proposal at this stage:
https://issues.apache.org/jira/browse/KAFKA-2483. And it doesn't
include batch identifiers or markers.

If you want a second pair of eyes on your proposal, I'll be happy to
take a look :)

Few things that may help mark "batches":
* KIP-98 adding transactions - we could implement a "batch" as an
atomic transaction and then read-committed consumers will only see
completed batches. This sounds like the best fit for your use-case,
but will require waiting for the feature to land.
* KIP-82 adding headers - having "header only" control message marking
the end of the batch. Or as you suggested, adding batch id to the
event itself. Generating unique batch IDs might get a bit tricky.
Also, it still wouldn't tell you when a batch ended.
* Kafka Streams just added session windows. I'm wondering if this can
be used to detect batches after the fact.
>>> > email to confluent-platf...@googlegroups.com.
>>> > To post to this group, send email to confluent...@googlegroups.com.
>>> > To view this discussion on the web visit
>>> >
>>> > https://groups.google.com/d/msgid/confluent-platform/6dbcb22b-b0e8-47c7-89bf-7eecc9a6c0ef%40googlegroups.com.
>>> > For more options, visit https://groups.google.com/d/optout.
>>>
>>>
>>>
>>> --
>>> Gwen Shapira
>>> Product Manager | Confluent
>>> 650.450.2760 | @gwenshap
>>> Follow us: Twitter | blog
>
> --
> You received this message because you are subscribed to the Google Groups
> "Confluent Platform" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to confluent-platf...@googlegroups.com.
> To post to this group, send email to confluent...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/confluent-platform/6a5fc04d-c4af-4e54-9af9-58eca57a9820%40googlegroups.com.

Gwen Shapira

unread,
Feb 24, 2017, 10:13:44 PM2/24/17
to confluent...@googlegroups.com
Another option:

We have SMT now. Allowing you to "inject" fields into the data between
the source connector and Kafka.
Perhaps injecting a timestamp will help?
Or, inject a static field with batch-id. Modify the configuration
after each batch to bump up the ID.

SMTs were added in 0.10.2.0. If your brokers are 0.10.0.0 and up, you
can upgrade just Connect to 0.10.2.0 and leave the brokers as is.

Gwen
Reply all
Reply to author
Forward
0 new messages