Kafka Connect - Source Task commit() Method Examples

336 views
Skip to first unread message

Steven Deutscher-Kobayashi

unread,
Jun 13, 2016, 11:34:19 AM6/13/16
to Confluent Platform
Hello,

I am trying to develop a connector which relies its own offset management (as oppose to the built-in management provided by connect)

My connector is being developed using confluent 2.0.0 but also including the fix from https://issues.apache.org/jira/browse/KAFKA-3225

Are there any examples of source connectors that are implementing commit? If anyone could elaborate on the approach to implementing commit() it would be really helpful as I am running into quite a few issues. The core of the issue for me is how do I know which records actually made it to Kafka when commit() is invoked?

Thanks for your time,
Steven

Ewen Cheslack-Postava

unread,
Jun 14, 2016, 12:33:06 AM6/14/16
to Confluent Platform
It'll be everything that you returned before commit() was invoked. Because this can be tricky to manage and you have to track outstanding message offsets yourself, we introduced commitRecord() in the latest release: http://docs.confluent.io/3.0.0/connect/javadocs/org/apache/kafka/connect/source/SourceTask.html#commitRecord-org.apache.kafka.connect.source.SourceRecord- The commit() version is good if you can bulk-acknowledge messages in your source system. The latter is more useful if you do individual acking (or at least explicitly list all messages to ack even if you can batch them).

-Ewen

 

Thanks for your time,
Steven

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/bbdf33b2-1432-4c2c-ae5b-b778316394af%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Steven Deutscher-Kobayashi

unread,
Jun 14, 2016, 9:26:20 AM6/14/16
to Confluent Platform
Thanks Ewen that is very helpful.

Sadly upgrading to 3.0 is not an option for us ATM but I think we should be okay with commit()

Cheers,
Steven


On Tuesday, June 14, 2016 at 12:33:06 AM UTC-4, Ewen Cheslack-Postava wrote:
On Mon, Jun 13, 2016 at 8:34 AM, 'Steven Deutscher-Kobayashi' via Confluent Platform <confluent...@googlegroups.com> wrote:
Hello,

I am trying to develop a connector which relies its own offset management (as oppose to the built-in management provided by connect)

My connector is being developed using confluent 2.0.0 but also including the fix from https://issues.apache.org/jira/browse/KAFKA-3225

Are there any examples of source connectors that are implementing commit? If anyone could elaborate on the approach to implementing commit() it would be really helpful as I am running into quite a few issues. The core of the issue for me is how do I know which records actually made it to Kafka when commit() is invoked?

It'll be everything that you returned before commit() was invoked. Because this can be tricky to manage and you have to track outstanding message offsets yourself, we introduced commitRecord() in the latest release: http://docs.confluent.io/3.0.0/connect/javadocs/org/apache/kafka/connect/source/SourceTask.html#commitRecord-org.apache.kafka.connect.source.SourceRecord- The commit() version is good if you can bulk-acknowledge messages in your source system. The latter is more useful if you do individual acking (or at least explicitly list all messages to ack even if you can batch them).

-Ewen

 

Thanks for your time,
Steven

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Ewen Cheslack-Postava

unread,
Jun 14, 2016, 2:48:18 PM6/14/16
to Confluent Platform
Yeah, to clarify, you can absolutely track the info you need yourself within the connector and use the existing commit() API. The new API just lets you handle individual records more quickly (as soon as the ack is received from Kafka) and doesn't require you to track outstanding messages yourself.

-Ewen

On Tue, Jun 14, 2016 at 6:26 AM, 'Steven Deutscher-Kobayashi' via Confluent Platform <confluent...@googlegroups.com> wrote:
Thanks Ewen that is very helpful.

Sadly upgrading to 3.0 is not an option for us ATM but I think we should be okay with commit()

Cheers,
Steven

On Tuesday, June 14, 2016 at 12:33:06 AM UTC-4, Ewen Cheslack-Postava wrote:
On Mon, Jun 13, 2016 at 8:34 AM, 'Steven Deutscher-Kobayashi' via Confluent Platform <confluent...@googlegroups.com> wrote:
Hello,

I am trying to develop a connector which relies its own offset management (as oppose to the built-in management provided by connect)

My connector is being developed using confluent 2.0.0 but also including the fix from https://issues.apache.org/jira/browse/KAFKA-3225

Are there any examples of source connectors that are implementing commit? If anyone could elaborate on the approach to implementing commit() it would be really helpful as I am running into quite a few issues. The core of the issue for me is how do I know which records actually made it to Kafka when commit() is invoked?

It'll be everything that you returned before commit() was invoked. Because this can be tricky to manage and you have to track outstanding message offsets yourself, we introduced commitRecord() in the latest release: http://docs.confluent.io/3.0.0/connect/javadocs/org/apache/kafka/connect/source/SourceTask.html#commitRecord-org.apache.kafka.connect.source.SourceRecord- The commit() version is good if you can bulk-acknowledge messages in your source system. The latter is more useful if you do individual acking (or at least explicitly list all messages to ack even if you can batch them).

-Ewen

 

Thanks for your time,
Steven

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.



--
Thanks,
Ewen

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Mangesh B

unread,
Jun 16, 2016, 2:43:39 AM6/16/16
to Confluent Platform
Hi Ewen,

I believe there was a bug in 2.x version of Confluent identified where the commit method does not get invoked. And the fix is only available in 3.x version. Is this not correct?


On Wednesday, 15 June 2016 00:18:18 UTC+5:30, Ewen Cheslack-Postava wrote:
Yeah, to clarify, you can absolutely track the info you need yourself within the connector and use the existing commit() API. The new API just lets you handle individual records more quickly (as soon as the ack is received from Kafka) and doesn't require you to track outstanding messages yourself.

-Ewen
On Tue, Jun 14, 2016 at 6:26 AM, 'Steven Deutscher-Kobayashi' via Confluent Platform <confluent...@googlegroups.com> wrote:
Thanks Ewen that is very helpful.

Sadly upgrading to 3.0 is not an option for us ATM but I think we should be okay with commit()

Cheers,
Steven

On Tuesday, June 14, 2016 at 12:33:06 AM UTC-4, Ewen Cheslack-Postava wrote:
On Mon, Jun 13, 2016 at 8:34 AM, 'Steven Deutscher-Kobayashi' via Confluent Platform <confluent...@googlegroups.com> wrote:
Hello,

I am trying to develop a connector which relies its own offset management (as oppose to the built-in management provided by connect)

My connector is being developed using confluent 2.0.0 but also including the fix from https://issues.apache.org/jira/browse/KAFKA-3225

Are there any examples of source connectors that are implementing commit? If anyone could elaborate on the approach to implementing commit() it would be really helpful as I am running into quite a few issues. The core of the issue for me is how do I know which records actually made it to Kafka when commit() is invoked?

It'll be everything that you returned before commit() was invoked. Because this can be tricky to manage and you have to track outstanding message offsets yourself, we introduced commitRecord() in the latest release: http://docs.confluent.io/3.0.0/connect/javadocs/org/apache/kafka/connect/source/SourceTask.html#commitRecord-org.apache.kafka.connect.source.SourceRecord- The commit() version is good if you can bulk-acknowledge messages in your source system. The latter is more useful if you do individual acking (or at least explicitly list all messages to ack even if you can batch them).

-Ewen

 

Thanks for your time,
Steven

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen

Disclaimer :-

This email and any files transmitted with it are confidential and intended solely for the use of the individual or entity to which they are addressed. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately by e-mail and destroy all copies of this message and any attachments. Any views or opinions presented in this email are solely those of the author and do not necessarily represent those of the company. 

Warning: Although the company has taken reasonable precautions to ensure no viruses are present in this email, the company cannot accept responsibility for any loss or damage arising from the use of this email or attachments.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent...@googlegroups.com.



--
Thanks,
Ewen

Steven Deutscher-Kobayashi

unread,
Jun 16, 2016, 9:37:49 AM6/16/16
to Confluent Platform
Hi Mangesh,

You're right about the bug. 


For our problem we have just added that fix on top of confluent 2.0 since we can't upgrade our brokers at this time.

Cheers,
Steven
Reply all
Reply to author
Forward
0 new messages