Expected behaviour of Kafka Sink Connector for the period, when sink source is not working properly

327 views
Skip to first unread message

Andrey Rybinski

unread,
Jun 16, 2017, 5:04:52 AM6/16/17
to Confluent Platform
Hi, I have some problems related to Kafka Connect InfluxDB Sink connector. It is maintained by Data Mountaineer and I addressed questions to them. 

But I also have some questions regarding to basic working principles of Kafka Sink Connector. 

Am I right, that if for some reason it is not possible to transfer messages to sink source (e.g. DB is down and task status of related connector changes to FAILED), then when problem is solved (e.g. DB is running again) and connector is restarted and after restart task status changes to RUNNING, all kafka messages that were produced during the outage time should be transferred to sink source?

I found following from Kafka Connect documentation: 

offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compaction

Will something be stored to offset topic, if there is only one Kafka broker in cluster and topics are not created before running the sink connector?  

Thanks in advance!

Randall Hauch

unread,
Jun 16, 2017, 9:30:33 AM6/16/17
to confluent...@googlegroups.com
On Fri, Jun 16, 2017 at 4:04 AM, Andrey Rybinski <andrey....@gmail.com> wrote:
Hi, I have some problems related to Kafka Connect InfluxDB Sink connector. It is maintained by Data Mountaineer and I addressed questions to them. 

But I also have some questions regarding to basic working principles of Kafka Sink Connector. 

Am I right, that if for some reason it is not possible to transfer messages to sink source (e.g. DB is down and task status of related connector changes to FAILED), then when problem is solved (e.g. DB is running again) and connector is restarted and after restart task status changes to RUNNING, all kafka messages that were produced during the outage time should be transferred to sink source?

Yes.
 

I found following from Kafka Connect documentation: 

offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compaction

Will something be stored to offset topic, if there is only one Kafka broker in cluster and topics are not created before running the sink connector?  

You need to either manually create the 3 storage topics (for offsets, configs, and status) per the documentation before starting your workers, or you need to set up Kafka's auto-creation of topics with those setting. Note in the latter case, the topic-specific settings for these 3 topics may not be what you want for other topics, so manually creating them is preferred.

Kafka Connect 0.11.0.0 will have a new feature that allows Kafka Connect to create these internal topics. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-154+Add+Kafka+Connect+configuration+properties+for+creating+internal+topics for details.

Best regards,

Randall
 

Thanks in advance!

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/6ab5ac48-d94d-49df-956f-e32e64ab88ab%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Andrey Rybinski

unread,
Jun 18, 2017, 9:42:00 AM6/18/17
to Confluent Platform
I tried to manually create the 3 storage topics (for offsets, configs, and status) per the documentation (the only difference is that I could not put replication factor to 2, because in my demo I had only 1 kafka broker in cluster). That did not fix the problem. 

However info from logs of influxdb connector

 [2017-06-15 22:58:04,100] WARN Commit of WorkerSinkTask{id=influxdb-sink-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
 [2017-06-15 22:59:04,099] INFO Empty list of records received. (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask)
 [2017-06-15 22:59:04,100] INFO WorkerSinkTask{id=influxdb-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
 [2017-06-15 22:59:04,100] WARN Commit of WorkerSinkTask{id=influxdb-sink-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
 [2017-06-15 23:00:04,099] INFO Empty list of records received. (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask)
 [2017-06-15 23:00:04,099] INFO WorkerSinkTask{id=influxdb-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
 [2017-06-15 23:00:04,099] WARN Commit of WorkerSinkTask{id=influxdb-sink-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)
 [2017-06-15 23:01:04,099] INFO Empty list of records received. (com.datamountaineer.streamreactor.connect.influx.InfluxSinkTask)
 [2017-06-15 23:01:04,099] INFO WorkerSinkTask{id=influxdb-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
 [2017-06-15 23:01:04,099] WARN Commit of WorkerSinkTask{id=influxdb-sink-0} offsets timed out (org.apache.kafka.connect.runtime.WorkerSinkTask)



I thought that this could be fixed for the latest image of Confluent Kafka Connect image and executed following test with mysql database and Conlfuent JDBC sink connector. 

I mixed knowledge from following articles: 



and wrote some data to kafka topic, when DB was up, than stopped DB Docker container, wrote some info while DB is down, than started DB container and restarted related connector. 

All the info from kafka topic was transferred to DB (the behaviour, which I expected from InfluxDB connector, but currently it does not work as I expect). 

One strange thing is that when I subscribed to topic that was marked as offset topic for connector, it was empty. 

kafka-connect.properties:

rest.port=38083
config.storage.topic=quickstart-avro-sink-config
group.id=quickstart-avro-sink
log4j.root.loglevel=DEBUG
key.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
offset.storage.topic=quickstart-avro-sink-offsets
internal.key.converter.schemas.enable=false
bootstrap.servers=localhost:29092
value.converter=io.confluent.connect.avro.AvroConverter
status.storage.topic=quickstart-avro-sink-status
value.converter.schema.registry.url=http://localhost:8081
internal.value.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter

I have some questions related to that: 

1) Does it means, that related info was stored and later read from __consumer_offsets topic?
2) Is fix of bug KAFKA-4942 is not yet included to the latest Docker image of Kafka Connect (Confluent)?
3) If no, is assumed correct behaviour is writing info to specified connect topic (quickstart-avro-sink-offsets in my example)?

On Friday, 16 June 2017 16:30:33 UTC+3, Randall Hauch wrote:
On Fri, Jun 16, 2017 at 4:04 AM, Andrey Rybinski <andrey....@gmail.com> wrote:
Hi, I have some problems related to Kafka Connect InfluxDB Sink connector. It is maintained by Data Mountaineer and I addressed questions to them. 

But I also have some questions regarding to basic working principles of Kafka Sink Connector. 

Am I right, that if for some reason it is not possible to transfer messages to sink source (e.g. DB is down and task status of related connector changes to FAILED), then when problem is solved (e.g. DB is running again) and connector is restarted and after restart task status changes to RUNNING, all kafka messages that were produced during the outage time should be transferred to sink source?

Yes.
 

I found following from Kafka Connect documentation: 

offset.storage.topic (default connect-offsets) - topic to use for storing offsets; this topic should have many partitions, be replicated, and be configured for compaction

Will something be stored to offset topic, if there is only one Kafka broker in cluster and topics are not created before running the sink connector?  

You need to either manually create the 3 storage topics (for offsets, configs, and status) per the documentation before starting your workers, or you need to set up Kafka's auto-creation of topics with those setting. Note in the latter case, the topic-specific settings for these 3 topics may not be what you want for other topics, so manually creating them is preferred.

Kafka Connect 0.11.0.0 will have a new feature that allows Kafka Connect to create these internal topics. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-154+Add+Kafka+Connect+configuration+properties+for+creating+internal+topics for details.

Best regards,

Randall
 

Thanks in advance!

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages