kafka connect internal topics

2,939 views
Skip to first unread message

Zainal A

unread,
Dec 28, 2020, 6:20:38 PM12/28/20
to Confluent Platform
Hi, I have question regarding the internal topics used by Kafka Connect.
Per docs I read from https://docs.confluent.io/platform/current/connect/userguide.html#kconnect-internal-topics, especially these two bullet points:
  • Connect clusters cannot share Group IDs or internal topics. Simply changing a group.id will not create a new worker separate from an existing Connect cluster. The new group.id must also have unique internal topics associated with it. This requires setting unique config.storage.topic, offset.storage.topic, and status.storage.topic configuration properties for the new group.id.
  • You also must use different connector names than those used in the existing Connect cluster since a consumer group is created based on the connector name. Each connector in a Connect cluster shares the same consumer group.
Per my understanding is for each kafka connect job (i.e. for sink connector, it reads from same source topic), then it needs its own config/offset/status topics, is that correct?

Right now I have two kafka connect sink jobs, each read from different source kafka topic, let's call it topic A and topic B, but those two jobs are using the same config/offset/status topics, and furthermore using the same connect job name, though they differ on group id, but according to the docs, this scenario will not work, right?

From my understanding, if I want to use or share the same config/offset/status internal topics (i.e. i don't want to create many kafka topics) for two different jobs (for reading different kafka source topics), then I need to use different kafka connect name, is that correct?

I really appreciate if someone can comment on this, as I am still learning about Connect intricacies.

Thank you!

Robin Moffatt

unread,
Jan 4, 2021, 6:28:58 AM1/4/21
to confluent...@googlegroups.com
Hi, 

You need a unique set of config/offset/status topics per Kafka Connect *cluster* when running it in Distributed mode. 

If you have two connectors (jobs) running on the same cluster then they can use a single set of internal topics. It's only if you wanted to run them on separate clusters that you'd need a second set of internal topics. 

A Kafka Connect distributed cluster can be a worker on a single machine, or workers spread across multiple machines (for capacity & redundancy). You could even run two clusters on a single machine if you really wanted to by running two worker processes each configured as its own cluster - although off the top of my head I can't think why you'd do that except for prototyping/dev environments. 

Hope that helps - let me know if there's anything that's still unclear. 


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/3b77c093-b5d0-4e80-95ea-c7c04685d120n%40googlegroups.com.

Zainal A

unread,
Jan 4, 2021, 12:12:37 PM1/4/21
to Confluent Platform
Hi Robin,
Thanks a lot for responding to my question.
We have same Kafka cluster running across 2 datacenters (spread deployment), so for that Kafka cluster, I created a set of config/offset/status topics.
Then I tried to run two connectors (jobs): one for each datacenter, and both are using the same internal topics above.
Due to my lacking of knowledge, I set both jobs with same connector name, but different group.id and different kafka source topic (this is Sink connector), hoping that I'll get two different connectors running.
Per https://docs.confluent.io/platform/current/connect/userguide.html#kconnect-internal-topics:

  • Connect clusters cannot share Group IDs or internal topics. Simply changing a group.id will not create a new worker separate from an existing Connect cluster. The new group.id must also have unique internal topics associated with it. This requires setting unique config.storage.topic, offset.storage.topic, and status.storage.topic configuration properties for the new group.id.
  • You also must use different connector names than those used in the existing Connect cluster since a consumer group is created based on the connector name. Each connector in a Connect cluster shares the same consumer group.
Per bullet points above, my understanding is I cannot use same connector name different group.id to achieve what I am looking for, and instead I should specify different connector name for each, is that correct?

Also, have you seen this error before? Earlier I thought this error happened because I used same connector name for same set of internal topics, which may mess them up :-(

Uncaught exception in REST call to /connectors/<connector-name>/tasks/8/restart
java.lang.IllegalArgumentException: "uriTemplate" parameter is null.
        at org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:189)
        at org.glassfish.jersey.uri.internal.JerseyUriBuilder.uri(JerseyUriBuilder.java:72)
        at javax.ws.rs.core.UriBuilder.fromUri(UriBuilder.java:120)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:258)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:293)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.restartTask(ConnectorsResource.java:213)
        at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
...

Robin Moffatt

unread,
Jan 4, 2021, 12:25:09 PM1/4/21
to confluent...@googlegroups.com
Let's take a step back for a moment :-) 

1. How many Kafka Connect workers do you have? In one data center or both? 

2. What are you trying to achieve here? Is it streaming data from one topic in the Kafka cluster to a target sink but with fault tolerance? Or data from two topics to different sinks? 

With that explained, it will be easier to answer your questions. 


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


Zainal A

unread,
Jan 4, 2021, 1:33:18 PM1/4/21
to Confluent Platform
My answers below:
1. One connector job in each data center (and we have 2 datacenters, so total 2 connect jobs)
2. Yes, each connect job (in each data center) will consume from different source topic name, but both will use the same internal config/offset/status topics

Regarding the exception I've seen, it seems same as https://issues.apache.org/jira/browse/KAFKA-9883, but I am not 100% sure since I still see the same exception.
I need to investigate more, but in case you ever seen this exception below and know the workaround. Thanks a lot!

Robin Moffatt

unread,
Jan 4, 2021, 5:02:26 PM1/4/21
to confluent...@googlegroups.com
When you say "Connect job" are you referring to a Kafka Connect worker (the JVM process), or a connector (one or many of which run in a Kafka Connect cluster)? 


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


Zainal A

unread,
Jan 4, 2021, 6:30:05 PM1/4/21
to Confluent Platform
Hi Robin, I referred to a connector, so there will be total two connectors.
I realized now after I read the bullet points in that doc again:

  • Connect clusters cannot share Group IDs or internal topics. Simply changing a group.id will not create a new worker separate from an existing Connect cluster. The new group.id must also have unique internal topics associated with it. This requires setting unique config.storage.topic, offset.storage.topic, and status.storage.topic configuration properties for the new group.id.
  • You also must use different connector names than those used in the existing Connect cluster since a consumer group is created based on the connector name. Each connector in a Connect cluster shares the same consumer group.
I think earlier I missed that it does mention "connect cluster", so in my case, since I run each connector in two different datacenters, then I actually have two separate connect clusters (one in each datacenter), so for both connectors sharing the same connector name, and config/offset/status internal topics SHOULD work, right?

Btw, I noticed that the offset internal topic is always empty, do you know if that internal topic being used at all by Kafka Connect? If not, then how it resumes from the last offset? Per my experiment, stop/start the job, it knows which data from source topic that already processed, and yet that internal topic is still empty.
I am trying to decode the information stored in other internal topics, I wish there are more docs about how these internal topics are used and how to leverage them for troubleshooting. Thanks!

Zainal A

unread,
Jan 19, 2021, 10:15:52 AM1/19/21
to Confluent Platform
Hi Robin,
Apologize for bringing it back this thread again, but I just want to confirm few points here to solidify my understanding.
1. For same connector name and group id, the connector can use the same internal topics
2. For different connector name and group id, the connector should use different internal topics
3. Point 2 above means different connector name and group id cannot share same internal topics

Are these points correct?
From my experiment, it looks like I cannot run different connect jobs (i.e. different connector name and group id) and sharing internal topics, is that true?

Thanks,
zainal
Reply all
Reply to author
Forward
0 new messages