KAFKA Connect - REST Service as Source

710 views
Skip to first unread message

Saravanan Tirugnanum

unread,
Jan 4, 2016, 9:41:08 AM1/4/16
to Confluent Platform
I have created a custom Connector to read from REstful services as a source and reading through the rows in iterative mode with static partitions. Here is my step by step approach
1. Call Rest Service and pull rows\events in batch ( 999 records at a time) and poll iteratively.
2. Populate SourceRecord in batch and write events into Kafka ( this happens through framework)
3. Call Delete Service to delete the rows from source.

Would Require recommendations on the below points.
1. Is this approach fine. This works fine for normal scenarios but wanted to understand more on negative scenarios and how to handle them.
2. I need to ensure the delete happens only when all the messages in that batch are published successfully. So , where do we need to call delete. Inside Stop?
3. If messages are not published successfully due to network issue or other , does the framework handle exceptions and make retries to ensure Exactly Once delivery
4. Since am deleting the records after i read\publish to kafka. how could we track offsets in this case.

Regards
Saravanan

Ewen Cheslack-Postava

unread,
Jan 4, 2016, 10:56:54 AM1/4/16
to Confluent Platform
On Mon, Jan 4, 2016 at 9:41 AM, Saravanan Tirugnanum <vtsa...@gmail.com> wrote:
I have created a custom Connector to read from REstful services as a source and reading through the rows in iterative mode with static partitions. Here is my step by step approach
1. Call Rest Service and pull rows\events in batch ( 999 records at a time) and poll iteratively.
2. Populate SourceRecord in batch and write events into Kafka ( this happens through framework)
3. Call Delete Service to delete the rows from source.

Would Require recommendations on the below points.
1. Is this approach fine. This works fine for normal scenarios but wanted to understand more on negative scenarios and how to handle them.

Probably the biggest issue is that you haven't mentioned how you track offsets. If something crashes/fails, how do you decide where to start again? I'm guessing that right now you just restart from the beginning. If you implement what I describe below for question (2), then you should automatically get at least once semantics without having to implement anything yourself -- the framework takes care of tracking offsets in a way that it can be sure all records get delivered even in case of failures.

However, note that you *will* need to load offsets when tasks start up and resume from those offsets. See this section of the developer guide: http://docs.confluent.io/2.0.0/connect/devguide.html#resuming-from-previous-offsets
 
2. I need to ensure the delete happens only when all the messages in that batch are published successfully. So , where do we need to call delete. Inside Stop?

You should override the commit() method: http://docs.confluent.io/2.0.0/connect/javadocs/org/apache/kafka/connect/source/SourceTask.html#commit() It will be periodically invoked by the framework. "Commit" in your case simply means deleting the records from the source system.

commit() should be invoked before the task is fully shutdown, so you shouldn't need any separate logic in stop() aside from code to ensure any outstanding requests are stopped promptly.
 
3. If messages are not published successfully due to network issue or other , does the framework handle exceptions and make retries to ensure Exactly Once delivery

Exactly once delivery requires coordination between source and sink systems. While it is possible for some systems (e.g. the HDFS connector can accomplish this by storing offset information in HDFS and controlling offsets via the SinkContext object), it is not going to be generally possible. By default, Connect will aim for at least once delivery.

However, in terms of delivery, the default configuration for the producer in connect should handle these issues -- it has settings basically like mirror maker where it will retry indefinitely in order to aim for exactly once delivery if at all possible and, more importantly, to ensure there is no data loss.

If you *do* encounter a case where something gets lost, that's a bug and we'd like to get a JIRA filed for it.
 
4. Since am deleting the records after i read\publish to kafka. how could we track offsets in this case.

If you only delete records as described in my response to (2), you shouldn't have any case where you encounter offsets that have already been deleted -- the commit process should ensure that doesn't happen.

-Ewen


Regards
Saravanan

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9a10b2ac-9f51-4154-93d5-69bfa332670e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Saravanan Tirugnanum

unread,
Jan 4, 2016, 11:39:43 AM1/4/16
to Confluent Platform
Thanks a lot Ewen for the prompt reply. This helps.

I assume the commit method gets executed only after the poll method tries to publish messages successfully even after repeated attempts during failures.

Regards
Saravanan 

Regards
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Saravanan Tirugnanum

unread,
Jan 5, 2016, 2:53:16 PM1/5/16
to Confluent Platform
hi Ewen

I have checked the Connect Source code - couldnt find the call to Commit method is made. Can you please clarify from where this is being called.

Wanted to ensure this is being called after poll method ( i.e after publishing to kafka).

Regards
Saravanan

Saravanan Tirugnanum

unread,
Jan 6, 2016, 4:08:29 PM1/6/16
to Confluent Platform
Hi

I tried overriding the commit method but its not getting invoked. The delete works fine if included inside Stop method. Can you recommend if this is fine.

Also , when we run distributed ( with different rest.port) mode in the same host , the second worker is not picking up the tasks. When i try to register the connector using REST API , it says the connector already exists but not picking up the tasks. Am i missing something.

Regards
Saravanan

Saravanan Tirugnanum

unread,
Feb 1, 2016, 12:42:59 PM2/1/16
to Confluent Platform
Hi Ewen

Can you also please look into this

Regards
Saravanan
Reply all
Reply to author
Forward
0 new messages