Enable Kafka sink connector to insert data from topics to tables as and when sink is up

847 views
Skip to first unread message

Nilkanth Patel

unread,
Apr 4, 2017, 7:09:28 AM4/4/17
to Confluent Platform
Hello,

I have developed kafka-sink-connector (using confluent-oss-3.2.0-2.11, connect framework) for my data-store (Amppol ADS), which stores data from kafka topics to corresponding tables in my store.

Every thing is working as expected as long as kafka servers and ADS servers are up and running.

Need a help/suggestions about a specific use-case where events are getting ingested in kafka topics and underneath sink component (ADS) is down. 
Expectation here is Whenever a sink servers comes up, records that were ingested earlier in kafka topics should be inserted into the tables;

Kindly advise how to handle such a case. 

Is there any support available in connect framework for this..? or atleast some references will be a great help.

Nilkanth Patel.

Ewen Cheslack-Postava

unread,
Apr 7, 2017, 2:12:44 AM4/7/17
to Confluent Platform
Nilkanth,


The framework has a special exception, RetriableException, that enables you to indicate that the operation failed, but you might be able to succeed if it is retried. In the case of failures you can either choose to handle retrying internally in your connector (but make sure you respect requests to SinkTask.stop!), or throw that exception and have the framework redeliver the data. Note that when you throw this exception, the *same* data will be passed to the next put() call, so you should either full accept the data (and put it in your own buffer internal to the connector) or not accept any of it.

I wouldn't suggest retrying indefinitely since that can mask failures, but retrying for a short period can help mask transient issues with the other system.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/f893921e-eb48-402c-a99d-f056cb1b4177%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Nilkanth Patel

unread,
Apr 24, 2017, 12:44:51 AM4/24/17
to confluent...@googlegroups.com
Thanks Ewen for your reply this is really helpful.

What i understand from your reply is, in case of RetriableException, connector framework replay the same collection<Sinkrecords> later as specified by the retry-interval. 

Is there any way to replay records of a specific topic and not a complete batch (collection<Sinkrecords>) which can include records for multiple records...? Need to address a case where table-1 (corresponding to topic-1) is not available but another table-2 corresponding to topic-2 is available.

Thanks in advance,
Nilkanth.


On Fri, Apr 7, 2017 at 11:42 AM, Ewen Cheslack-Postava <ew...@confluent.io> wrote:
Nilkanth,


The framework has a special exception, RetriableException, that enables you to indicate that the operation failed, but you might be able to succeed if it is retried. In the case of failures you can either choose to handle retrying internally in your connector (but make sure you respect requests to SinkTask.stop!), or throw that exception and have the framework redeliver the data. Note that when you throw this exception, the *same* data will be passed to the next put() call, so you should either full accept the data (and put it in your own buffer internal to the connector) or not accept any of it.

I wouldn't suggest retrying indefinitely since that can mask failures, but retrying for a short period can help mask transient issues with the other system.

-Ewen

On Tue, Apr 4, 2017 at 4:09 AM, Nilkanth Patel <nilkant...@gmail.com> wrote:
Hello,

I have developed kafka-sink-connector (using confluent-oss-3.2.0-2.11, connect framework) for my data-store (Amppol ADS), which stores data from kafka topics to corresponding tables in my store.

Every thing is working as expected as long as kafka servers and ADS servers are up and running.

Need a help/suggestions about a specific use-case where events are getting ingested in kafka topics and underneath sink component (ADS) is down. 
Expectation here is Whenever a sink servers comes up, records that were ingested earlier in kafka topics should be inserted into the tables;

Kindly advise how to handle such a case. 

Is there any support available in connect framework for this..? or atleast some references will be a great help.

Nilkanth Patel.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/UfNc-OH_n4s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Ewen Cheslack-Postava

unread,
Apr 24, 2017, 11:13:26 PM4/24/17
to Confluent Platform
On Sun, Apr 23, 2017 at 9:44 PM, Nilkanth Patel <nilkant...@gmail.com> wrote:
Thanks Ewen for your reply this is really helpful.

What i understand from your reply is, in case of RetriableException, connector framework replay the same collection<Sinkrecords> later as specified by the retry-interval. 

Correct.
 

Is there any way to replay records of a specific topic and not a complete batch (collection<Sinkrecords>) which can include records for multiple records...? Need to address a case where table-1 (corresponding to topic-1) is not available but another table-2 corresponding to topic-2 is available.

There isn't support for this today. I think more controlled/restricted delivery of record batches is something we could consider adjusting in the future. This gets a bit more complicated when transformations are added into the mix. Depending on the performance tradeoff, this could potentially even go so far as to deliver 1 record at a time, but this is a pretty major framework-level change that could have a big performance impact for many connectors.

If possible, could you give more info about your use case? It sounds like you're sending data to a non-HA system?

-Ewen
 

Thanks in advance,
Nilkanth.


To unsubscribe from this group and all its topics, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Nilkanth Patel

unread,
Apr 25, 2017, 9:54:31 AM4/25/17
to confluent...@googlegroups.com
Thanks Ewen for some more insight.

My connector works well (as expected) when underline sink system is up and running.

I am trying to handle following scenarios with the connector implemented.

Use-case ScenarioConnector is launched, Kafka producer producing records into topics,  but Complete cluster containing sink (data store containing tables) is down or not yet started.
             Expectation is whenever a cluster will be started and corresponding tables are created, data ingested in all topics so far should get ingested into the corresponding table.

As suggested, i have implemented a retry mechanism (based on RetriableException) but having an issue when a cluster is started, but any of all tables is not yet created. Example out of total 5 tables (table1, table2...table5), any table (lets say table-5) is not created
and hence SinkTask.put(Collection<SinkRecords>) throws a RetriableException, And records for all topics are replayed though Records for available tables are getting inserted into corresponding tables. 

It seems to me mechanism like single task managing single topic can work here and hence asked earlier a quetion reg. this.

Can you please advise or give some inputs how can i handle the explained scenario.

Thanks
Nilkanth.

         

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/UfNc-OH_n4s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Ewen Cheslack-Postava

unread,
Apr 25, 2017, 6:11:33 PM4/25/17
to Confluent Platform
On Tue, Apr 25, 2017 at 6:54 AM, Nilkanth Patel <nilkant...@gmail.com> wrote:
Thanks Ewen for some more insight.

My connector works well (as expected) when underline sink system is up and running.

I am trying to handle following scenarios with the connector implemented.

Use-case ScenarioConnector is launched, Kafka producer producing records into topics,  but Complete cluster containing sink (data store containing tables) is down or not yet started.
             Expectation is whenever a cluster will be started and corresponding tables are created, data ingested in all topics so far should get ingested into the corresponding table.

As suggested, i have implemented a retry mechanism (based on RetriableException) but having an issue when a cluster is started, but any of all tables is not yet created. Example out of total 5 tables (table1, table2...table5), any table (lets say table-5) is not created
and hence SinkTask.put(Collection<SinkRecords>) throws a RetriableException, And records for all topics are replayed though Records for available tables are getting inserted into corresponding tables. 

This can happen in other connectors for other reasons (e.g. in HDFS we may have a permissions problem that causes only some of the files to be unwritable).

The key observation here is that *accepting* data in SinkTask.put() does not imply you've *written* that data. That's why there's a separate flush() call. So your options here are:

1. Check that you're going to be able to write the data before attempting to write any of it. If there is some you can't write, throw a RetriableException and try again later. There's no issue with partially writing data since you only write it once you know you can write it all.

2. Accept *all* the data. Generally if you do this you want to make sure you save a reference to the records in your task, i.e. you have some buffer that stores data that hasn't been written yet. Then you start processing those messages, and half way through one of of them may fail. In this case, if you're going to apply some retries, you aren't *required* to throw RetriableException. You can just return normally. On subsequent put() calls, you can try clearing your buffer before accepting additional data (at that point you would need to throw RetriableException since the data in that put() call would not have been processed at all). Or, up to some limit, you can continue to accept and buffer data. When flush() happens, you'll just need to block until all the data can be written (or throw an exception if you hit a timeout, in which case offsets will not be committed).

-Ewen

 
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Nilkanth Patel

unread,
Apr 26, 2017, 6:46:42 AM4/26/17
to confluent...@googlegroups.com
Thanks Ewen, for the information, will see how can i utilized it in the context of my connector.

One more question, In this case cluster hosting a sink tables are up and running with desired tables created.

 When i start my connector i see following logs on console where i have started my connector,

[2017-04-26 03:36:11,147] INFO Sink task WorkerSinkTask{id=ampool-sink-connector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232)
[2017-04-26 03:36:11,620] WARN Error while fetching metadata with correlation id 2 : {table81=LEADER_NOT_AVAILABLE, table82=LEADER_NOT_AVAILABLE, table83=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:707)
[2017-04-26 03:36:11,629] INFO Discovered coordinator quickstart.cloudera:9092 (id: 2147483615 rack: null) for group connect-ampool-sink-connector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:573)
[2017-04-26 03:36:11,667] INFO Revoking previously assigned partitions [] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)
[2017-04-26 03:36:11,667] INFO (Re-)joining group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:407)
[2017-04-26 03:36:11,751] WARN The following subscribed topics are not assigned to any members in the group connect-ampool-sink-connector : [table81, table82, table83]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:357)
[2017-04-26 03:36:11,756] INFO Successfully joined group connect-ampool-sink-connector with generation 20 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-04-26 03:36:11,760] INFO Setting newly assigned partitions [] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-04-26 03:36:55,143] INFO Reflections took 47489 ms to scan 563 urls, producing 14103 keys and 91116 values  (org.reflections.Reflections:229)

[2017-04-26 03:41:11,623] INFO Revoking previously assigned partitions [] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)
[2017-04-26 03:41:11,623] INFO (Re-)joining group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:407)
[2017-04-26 03:41:11,631] INFO Successfully joined group connect-ampool-sink-connector with generation 21 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-04-26 03:41:11,633] INFO Setting newly assigned partitions [table81-0, table82-0, table83-0] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-04-26 03:42:10,646] INFO WorkerSinkTask{id=ampool-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-04-26 03:42:10,652] WARN Commit of WorkerSinkTask{id=ampool-sink-connector-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)
[2017-04-26 03:44:10,647] INFO WorkerSinkTask{id=ampool-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-04-26 03:44:10,647] WARN Commit of WorkerSinkTask{id=ampool-sink-connector-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)

And records are not getting ingested immediately. It takes some time - delayed ingestion into sink tables.

 Am i missing some configuration here..tried to set (stack overflow comment) port = 9092 advertised.host.name = localhost in server.properties but did not work?

Thanks in advance.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/UfNc-OH_n4s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Ewen Cheslack-Postava

unread,
Apr 26, 2017, 12:59:03 PM4/26/17
to Confluent Platform
The LEADER_NOT_AVAILABLE errors indicates data can't even be fetched from your Kafka cluster because there is no leader for some topic partitions. Sometimes this can happen for a short time as leader election is going on, but it should not be persistent. You should check the status of your Kafka cluster for those topics using the kafka-topics command.

-Ewen

To unsubscribe from this group and all its topics, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Nilkanth Patel

unread,
Apr 28, 2017, 1:04:00 AM4/28/17
to confluent...@googlegroups.com
Ewen,

LEADER_NOT_AVAILABLE errors is one of the issue and thanks for some insight.

but looking at the log, its seems releated log for delayed ingestion is  subscribed topics are not assigned to any members in the group. Revoking and Re-joining etc

Just see the timming,   03:36:11 --------> 03:41:11 ===> almost (4 minutes of delay, after which i can see records ingested in sink table.)
[2017-04-26 03:36:11,147] INFO Sink task WorkerSinkTask{id=ampool-sink-connector-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232)
[2017-04-26 03:36:11,620] WARN Error while fetching metadata with correlation id 2 : {table81=LEADER_NOT_AVAILABLE, table82=LEADER_NOT_AVAILABLE, table83=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:707)
[2017-04-26 03:36:11,629] INFO Discovered coordinator quickstart.cloudera:9092 (id: 2147483615 rack: null) for group connect-ampool-sink-connector. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:573)
[2017-04-26 03:36:11,667] INFO Revoking previously assigned partitions [] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)
[2017-04-26 03:36:11,667] INFO (Re-)joining group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:407)
[2017-04-26 03:36:11,751] WARN The following subscribed topics are not assigned to any members in the group connect-ampool-sink-connector : [table81, table82, table83]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:357)
[2017-04-26 03:36:11,756] INFO Successfully joined group connect-ampool-sink-connector with generation 20 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-04-26 03:36:11,760] INFO Setting newly assigned partitions [] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-04-26 03:36:55,143] INFO Reflections took 47489 ms to scan 563 urls, producing 14103 keys and 91116 values  (org.reflections.Reflections:229)

[2017-04-26 03:41:11,623] INFO Revoking previously assigned partitions [] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:393)
[2017-04-26 03:41:11,623] INFO (Re-)joining group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:407)
[2017-04-26 03:41:11,631] INFO Successfully joined group connect-ampool-sink-connector with generation 21 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:375)
[2017-04-26 03:41:11,633] INFO Setting newly assigned partitions [table81-0, table82-0, table83-0] for group connect-ampool-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:252)
[2017-04-26 03:42:10,646] INFO WorkerSinkTask{id=ampool-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-04-26 03:42:10,652] WARN Commit of WorkerSinkTask{id=ampool-sink-connector-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)
[2017-04-26 03:44:10,647] INFO WorkerSinkTask{id=ampool-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-04-26 03:44:10,647] WARN Commit of WorkerSinkTask{id=ampool-sink-connector-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask:172)

Any thoughs, how can i get rid of this " subscribed topics are not assigned to any members in the group connect-ampool-sink-connector" ?

Nilkanth.

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/UfNc-OH_n4s/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Ewen Cheslack-Postava

unread,
May 2, 2017, 12:49:55 AM5/2/17
to Confluent Platform
This sounds like it could be an issue with metadata refresh because the 4 minutes of delay is pretty close to the default 5 minute metadata refresh interval. The error message about "suscribed topics are not assigned to any members in the group" will persist for 5 minutes if an initial metadata fetch indicates there are 0 usable partitions (and therefore no partitions from that topic can be assigned). In the meantime, if partitions are added the consumer will not notice those changes until it requests updated metadata, as mentioned about every 5 minutes.

-Ewen

To unsubscribe from this group and all its topics, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages