In addition to Kafka brokers, there are a few deployment options to consider aswell. Understanding and acting on these deployment options ensures yourKafka Connect deployment will scale and support the long-term needs of yourdata pipeline.
Download https://urlcod.com/2yMLdj
Although Schema Registry is an optionalservice for Kafka Connect, it enables you to easily use Avro,Protobuf, and JSON Schema as commondata formats for the Kafka records that connectors read from and write to. Thiskeeps the need to write custom code at a minimum and standardizes your data in aflexible format. You also get the added benefit of schema evolution and enforcedcompatibility rules. For more information, see Using Kafka Connectwith Schema Registry andConfiguring key and value converters.
This mode is also more fault tolerant. For example, if a node unexpectedlyleaves the cluster, Kafka Connect distributes the work of that node toother nodes in the cluster. And, because Kafka Connect stores connectorconfigurations, status, and offset information inside the Kafka cluster whereit is safely replicated, losing the node where a Connect worker runs doesnot result in any loss of data.
Connect workers operate well in containers and managed environments, such asKubernetes, Apache Mesos, Docker Swarm, or Yarn. The distributed worker storesall states in Kafka making it easier to manage a cluster. And, by design,Kafka Connect does not handle restarting or scaling workers. This means yourexisting cluster management solution can continue to be used transparently. Notethat the standalone worker state is stored on the local file system.
Kafka Connect workers are JVM processes that can run on shared machines withsufficient resources. Hardware requirements for Connect workers are similarto that of standard Java producers and consumers. Resourcerequirements mainly depend on the types of connectors operated by the workers.More memory is required for environments where large messages are sent and wherea large numbers of messages get buffered before being written in aggregate formto an external system. Using compression continuously requires a more powerfulCPU.
If you have multiple workers running concurrently on a single machine, ensureyou know the resource limits (CPU and memory). Start with the default heap sizesetting and monitor internal metrics and thesystem. Verify the CPU, memory, and network (10 GbE or greater) aresufficient for the load.
Kafka Connect is designed to be extensible so developers can create customconnectors, transforms, and converters, and users can install and run them. Thissection will help you with installing Connect plugins.
A Kafka Connect plugin is a set of JAR files containing the implementation ofone or more connectors, transforms, or converters. Connect isolates eachplugin from one another so libraries in one plugin are not affected by thelibraries in any other plugins. This is very important when mixing and matchingconnectors from multiple providers.
A Kafka Connect plugin should never contain any libraries provided by theKafka Connect runtime. Kafka Connect finds the plugins using a plugin pathdefined as a comma-separated list of directory paths in the plugin.pathworker configuration property. Thefollowing shows an example plugin.path worker configuration property:
To install a plugin, you must place the plugin directory or uber JAR (or asymbolic link that resolves to one of these) in a directory already listed inthe plugin path. Or, you can update the plugin path by adding the absolute pathof the directory containing the plugin. Using the previous plugin path example,you would create a /usr/local/share/kafka/plugins directory on eachmachine running Connect and then place the plugin directories (or uberJARs) there.
When you start your Connect workers, each worker discovers all connectors,transforms, and converter plugins found inside the directories on the pluginpath. When you use a connector, transform, or converter, the Connect workerloads the classes from the respective plugin first, followed by theKafka Connect runtime and Java libraries. Connect explicitly avoids all ofthe libraries in other plugins. This prevents conflicts and makes it very easyto add and use connectors and transforms developed by different providers.
Earlier versions of Kafka Connect required a different approach to installingconnectors, transforms, and converters. All the scripts for running Connectrecognized the CLASSPATH environment variable. You would export thisvariable to define the list of paths to the connector JAR files. The followingexample shows an older CLASSPATH export variable mechanism:
Confluent does not recommend exporting CLASSPATH environment variable asusing this method to create a path to plugins can result in library conflictsthat can cause Kafka Connect and connectors to fail. Use the plugin.pathconfiguration property which properly isolates each plugin from other pluginsand libraries.
As described in Installing Connect Plugins, connector plugin JAR filesare placed in the plugin path (Connect workerproperty: plugin.path). However, a few connectors may require that youadditionally export the CLASSPATH to the plugin JAR files when startingthe connector (export CLASSPATH=). While notrecommended, CLASSPATH is required for these connectors becauseKafka Connect uses classloading isolation to distinguish between systemclasses and regular classes, and some plugins load system classes (forexample, javax.naming and others in the package javax). An exampleerror message showing this issue is provided below. If you see an error thatresembles the example below, in addition to adding the plugin path, you must also export CLASSPATH=when starting the connector.
The following sections provide information about running workers in standalonemode and distributed mode. For a list of worker configuration properties, seeKafka Connect Worker ConfigurationProperties.
Standalone mode is typically used for development and testing, or forlightweight, single-agent environments-for example, sending web server logs toKafka. The following example shows a command that launches a worker in standalonemode:
The first parameter (worker.properties) is the worker configurationproperties file. Note that worker.propertiesis an example file name. You can use any valid file name for your workerconfiguration file. This file gives you control over settings such as the Kafkacluster to use and serialization format. For an example configuration file thatuses Avro and SchemaRegistry in a standalone mode, open the filelocated at etc/schema-registry/connect-avro-standalone.properties. You cancopy and modify this file for use as your standalone worker properties file.
The second parameter (connector1.properties) is the connector configurationproperties file. All connectors have configuration properties that are loadedwith the worker. As shown in the example, you can launch multiple connectorsusing this command.
Distributed mode does not have any additional command-line parameters other thanloading the worker configuration file. New workers will either start a new groupor join an existing one with a matching group.id. Workers then coordinatewith the consumer groups to distribute the work to be done.
For an example distributed mode configuration file that uses Avro andSchema Registry, openetc/schema-registry/connect-avro-distributed.properties. You can make a copyof this file, modify it, use it as the new worker.properties file. Note thatworker.properties is an example file name. You can use any valid file namefor your properties file.
In standalone mode, connector configuration property files are added ascommmand-line parameters. However, in distributed mode, connectors are deployedand managed using a REST API request. To create connectors, you start the workerand then make a REST request to create the connector. REST request examples areprovided in many supported connector documents. Forinstance, see the Azure Blob Storage Source connector REST-based examplefor one example.
Note that if you run many distributed workers on one host machine for developmentand testing, the listeners configuration property must be unique for eachworker. This is the port the REST interface listens on for HTTP requests.
Connect stores connector and task configurations, offsets,and status in several Kafka topics. These are referred toas Kafka Connect internal topics. It is important that these internal topicshave a high replication factor, a compaction cleanup policy, and an appropriatenumber of partitions.
Kafka Connect can create the internal topics when it starts up, using theConnect worker configurationpropertiesto specify the topic names, replication factor, and number of partitions forthese topics. Connect verifies that the properties meet the requirements andcreates all topics with compaction cleanup policy.
Distributed workers that are configured with matching group.id valuesdiscover each other and form a Kafka Connect cluster. All workers in thecluster use the same three internal topics to share connector configurations,offset data, and status updates. For this reason, all distributed workerconfigurations in the same Connect cluster must have matchingconfig.storage.topic, offset.storage.topic, and status.storage.topicproperties.
In addition to the three required internal topic names, the distributed workerconfiguration should have identical values for the following listed properties.This ensures that any worker in the cluster will create missing internal topicswith the desired property values. Note that these configuration properties havepractical defaultvalues.
As each distributed worker starts up, it uses the internal Kafka topics if theyalready exist. If not, the worker tries to create the topics using the workerconfiguration properties. This gives you the option of manually creating thetopics before starting Kafka Connect, if you require topic-specific settingsor when Kafka Connect does not have the necessary privileges to create thetopics. If you do create the topics manually, follow the guidelinesprovided in the list of configurationproperties.
b1e95dc632