MongoDB source to MySQL Sink

591 views
Skip to first unread message

Kirk Prybil

unread,
Dec 5, 2017, 11:33:40 AM12/5/17
to debezium
Is there an example of anyone using Debezium MongoDB source connector and having that populate a MySQL format database as a Sink.

We are having issues with wrapping our heads around using Mongodb cdc statements for updates and cleanly applying them to a sink database that is MySQL.

Example Source Collection snippet

UserProfile{

                 address:{  street: "1st Street", city:"Chicago",state:"IL"}
                 visits: 50
......
}


Example Sink Table snippet

TABLE: USER_PROFILE

COLUMNS:  
....
address_street
address_city
address_state
visits

We can handle initialsync and op=c just fine as it gives us the whole record in the "after" with nested fields and we can map to table.   Where we run into issues is on the updates and parsing and mapping those correctly with the various $set, $unset, $incr statements that can be in the "patch" section

examples statements showing up in "patch" that we have to map (not all inclusive operations)

"patch" : {"$set" : {"address.street" : "2nd Street"}}   - path to field properties type format

"patch" : {"$set" : {"address" :  {"street":"2nd Street"}} - path to field object notation 

"patch": {"$set" : {"address" :  {"street":"2nd Street"}, "$inc": {"visits":10}} - mutiple statements

We are running into having to account for all Mongo DML statements to get to field mapping through a Kafka Streaming consumer listening to Mongo Source CDC (Debezium) output.

Is there a better way to attack this? 

Thanks,

Kirk




                  


Jiri Pechanec

unread,
Dec 5, 2017, 10:47:46 PM12/5/17
to debezium
Hi,

could you please look at https://github.com/debezium/debezium/pull/336? I belive this is the soltuion to your problem.

J.

Kirk Prybil

unread,
Dec 6, 2017, 9:07:07 AM12/6/17
to debezium
Jiri,

 I read through the post but need some additional clarity on exactly what is enabled and how.  

Is there a new version of Mongo CDC connector coming out?

Are there settings/configs/ or code to adjust from the Debeezium MongoDB Source connector?  

Are there settings/config adjustments on the Confluent jdbc connector side?

Does someone have an example of MongoDB Source to MySQL sink?

Sorry, we are new to both Debezium and Connect as CDC capability and are trying to come up to speed as quickly as possible.

Thanks,

Kirk

Jiri Pechanec

unread,
Dec 6, 2017, 10:32:54 AM12/6/17
to debezium
Hi,

we suppose there will be a new version of Debezium available next week. The format coming from MongoDb CDC will be the same. The change I pointed you to is a single message transform that will be available OOTB in Debezium - https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect

You could apply this transfromation either at MongoDB source - thus you will get only the simplified format to the Kafka broker or at the MySQL sink where it is exepcted that the event messages in default format are stored in Kafka broker and before sending them to the MySQL sink they will be transfromed to the format compatible with JDBC sink connector. The original issue (https://issues.jboss.org/browse/DBZ-409) was intended for Elasticsearch sink but JDBC sink expects the same format.

J.
Message has been deleted

makenz...@gmail.com

unread,
Dec 7, 2017, 3:10:06 PM12/7/17
to debezium


We've started planning out the source/sink configs from the Customer example from the debezium site for MongoDb Customer collection as source, and jdbc sink. 
If the goal (that pull request 336 is going to allow) is to go from MongoDB source to Mysql JDBC Sink:

What would the connector configs need to be changed to for the source? 

{
  "name": "inventory-connector",
  "config": {
  "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
  "tasks.max" : "1",
  "mongodb.hosts" : "rs0/mongodb:27017",
  "mongodb.name" : "dbserver1",
  "mongodb.user" : "debezium",
  "mongodb.password" : "dbz",
  "database.whitelist" : "inventory",
  "database.history.kafka.bootstrap.servers" : "kafka:9092"
  }
  }

And for the sink?

{
  "name": "jdbc-sink",
  "config": {
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": "1",
  "topics": "customers",
  "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
  "auto.create": "true",
  "insert.mode": "upsert",
  "pk.fields": "id",
  "pk.mode": "record_value"
  }
  }

So in short: in what connector config will the new MongoDB SMT need to be referenced, and how?



Also, do we have the ability to download and build the updates for this new functionality - before the potential release next week? 

Thank you in advance.

makenz...@gmail.com

unread,
Dec 7, 2017, 3:13:09 PM12/7/17
to debezium
Also, will the $set and $unset idempotent operations brought through by the MongoDB source connector be handled by the new mongo SMT?

Thanks in advance.

Jiri Pechanec

unread,
Dec 7, 2017, 11:50:58 PM12/7/17
to debezium
Hi,

you can try to build Debezium from the PR - https://github.com/debezium/debezium/pull/336, here is how you check it out - https://help.github.com/articles/checking-out-pull-requests-locally/

Looking in the code I'd say '$set' is supported but not '$unset'.

"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", - this line will get changed, the new class will be io.debezium.connector.mongodb.transforms.MongoDataConverter

Also please look at http://debezium.io/blog/2017/09/25/streaming-to-another-database/ section 'Topic Naming' as you would need probably to do a similar stuff.

It would be great if you could give the SMT try before it is merged so we can incorporate an early feedback. For fast communication just join https://gitter.im/debezium/dev

Have a nice day

J.
Reply all
Reply to author
Forward
0 new messages