Download Mongodb Kafka Connector !!INSTALL!!

0 views
Skip to first unread message

Joseph Middlebrook

unread,
Jan 21, 2024, 4:44:08 PM1/21/24
to wafertiba

If you don't want to use Confluent Platform you can deploy Apache Kafka yourself - it includes Kafka Connect already. Which plugins (connectors) you use with it is up to you. In this case you would be using Kafka Connect (part of Apache Kafka) plus kafka-connect-mongodb (provided by MongoDB).

a. Download mongodb connector '*-all.jar' from here .Mongodb-kafka connector with 'all' at the end will contain all connector dependencies also.b. Drop this jar file in your kafka's lib folderc. Configure 'connect-standalone_bare.properties' as:

download mongodb kafka connector


Download Zip 🗹 https://t.co/Q8ef4zPxuu



If that JAR doesn't bundle Avro itself, then MSK very likely doesn't include Avro like Confluent Platform does (which I assume Mongo bundled their connector primarily for). At least, Avro is not a dependency of Apache Kafka, so that would explain that error.

I faced same issue, when running on my local. I downloaded the jar(mongo-kafka-connect-1.6.0-confluent.jar) from confluent platform which does not provide uber jar anymore.So I searched for uber jar and found below site from where, I could download uber jar(select all in Download dropdown) and that resolved the issue.

In both cases I can perform complex document transformations because I can use programming language of my choice once a document is consumed, then use an Elasticsearch client in order to insert the document to Elasticsearch. However change streams option seems much more simple. So I'm wondering which use cases justify using a more complex option (Kafka connector) or maybe I'm missing something.

You have a classic trade off between a simpler narrower-purpose solution and a more complex, but more general purpose, solution. Change streams work for streaming changes from MongoDB, but what if you have sources other than MongoDB in the future? Also, I don't know how change streams scale, but kafka topics are designed from the ground up to partition for horizontal scaling, if you find you need to do your processing faster.

In Kafka Connect on Kubernetes, the easy way!, I had demonstrated Kafka Connect on Kubernetes using Strimzi along with the File source and sink connector. This blog will showcase how to build a simple data pipeline with MongoDB and Kafka with the MongoDB Kafka connectors which will be deployed on Kubernetes with Strimzi.

In the config section, we enter the connector config including the MongoDB connection string, database and collection names, whether we want to copy over existing data etc. The topic.prefix attribute is added to database & collection names to generate the name of the Kafka topic to publish data to. e.g. if the database and collection names are test_db, test_coll respectively, then the Kafka topic name will be mongo.test_db.test_coll. Also, the publish.full.document.only is set to true - this means that, only the document which has been affected (created, updated, replaced) will be published to Kafka, and not the entire change stream document (which contains a lot of other info)

As per instructions, if you had created items in the source MongoDB collection, check the kafkacat terminal - you should see the Kafka topic records popping up. Go ahead and add a few more items to the MongoDB collection and confirm that you can see them in the kafkacat consumer terminal

In the config section we need to specify the source Kafka topic (using topics) - this is the same Kafka topic to which the source connector has written the records to. database and collection should be populated with the names of the destination database and collection respectively. Note the post.processor.chain attribute contains com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder - this automatically adds an attribute (topic-partition-offset) to the MongoDB document and captures the Kafka topic, partition and offset values

To start with, the connector copies over existing records in the Kafka topic (if any) into the sink collection. If you had initially created items in source Azure Cosmos DB collection, they should have been copied over to Kafka topic (by the source connector) and subsequently persisted to the sink Azure Cosmos DB collection by the sink connector - to confirm this, query Azure Cosmos DB using any of the methods mentioned previously

As mentioned before, this was a simplified example to help focus on the different components and moving parts e.g. Kafka, Kubernetes, MongoDB, Kafka Connect etc. I demonstrated a use case where the record was modified before finally storing in the sink collection, but there are numerous other options which the connector offers, all of which are config based and do not require additional code (although the there are integration hooks as well). Some of the example include, using custom pipelines in the source connector, [post-processors] ( -connector/current/kafka-sink-postprocessors/#post-processing-of-documents) in the sink connector etc.

The MongoDB source connector uses change streams to capture changes in MongoDB data at set intervals. Rather than directly polling the collection, the connector pulls new changes from a change stream using a query-based approach. You can set the polling interval as a parameter to determine how often changes are emitted from the stream. For a log-based change data capture method, use the Debezium source connector for MongoDB instead.

The connector will write to a topic named DATABASE.COLLECTION so either create the topic in your Kafka service, or enable the auto_create_topic parameter so that the topic will be created automatically.

You can create a source connector taking the students MongoDB collection to Apache Kafka with the following connector configuration, after replacing the placeholders for MONGODB_HOST, MONGODB_PORT, MONGODB_DB_NAME, MONGODB_USERNAME and MONGODB_PASSWORD:

Specifies how the connector should start up when there is no source offset available. Resuming a change stream requires a resume token, which the connector stores as reads from the source offset. If no source offset is available, the connector may either ignore all or some existing source data, or may at first copy all existing source data and then continue with processing new data. Possible values are:

In most instances, the preceding basic configuration properties are sufficient.If you require additional property settings, then specify any of the followingoptional advanced connector configuration properties by selecting Show advanced optionson the Create Connector page:

AVRO (io.confluent.connect.avro.AvroConverter) or JSON (org.apache.kafka.connect.json.JsonConverter)for output with a preset schema. Additionally, you can set Enable Infer Schemas for the value.Each document will be processed in isolation, which may lead to multiple schema definitions for the data.

Debezium does provide a Single Message Transform (SMT) to flatten the MongoDB record out like this, but in using it I hit a bug (DBZ-649) that seems to be down to the MongoDB collection documents having different fields between documents. The reported error was org.apache.kafka.connect.errors.DataException: is not a valid field name.

The connector catalog contains a list of connectors that are supported either by IBM or the community:

  • IBM supported: Support is provided by IBM for customers who have a license for IBM Event Automation or IBM Cloud Pak for Integration. Raise any issues through the official IBM support channel. IBM will investigate, identify, and provide a fix when possible.
  • Community supported: Each community connector is subject to its own set of license terms and not supported by IBM. Raise issues through the community support links provided for each connector.

We are running Kafka Connect in a distributed mode on 3 nodes using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector.

The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method.

Resume feature: the connector has the ability to continue processing from a specific point in time. As per connector docs - "The top-level _id field is used as the resume token which is used to start a change stream from a specific point in time."

We have the first half of the setup using which we can post MongoDB operations details to a Kafka topic. Let's finish the other half which will transform the data in the Kafka topic and store it in a destination MongoDB collection. For this, we will use the Sink connector - here is the definition

i can authenticate into mongo-replica-set by other third party tools like robo3t or another application which i wrote myself. but can't do it with debezium_connect in docker network. also should note that the application i wrote myself has no problem connecting to mongodb in the docker network.

Kafka Connect is an open-source tool to ingest data from data systems (e.g. databases) and to stream changes to data systems. When data is ingested into Kafka a source connector is used, when data is streamed from Kafka sink connector is used. Kafka connectors and by extension Debezium are managed via REST API. We will use a source connector in order to stream database changes to a Kafka topic.

df19127ead
Reply all
Reply to author
Forward
0 new messages