Kafka connect MongoDB

473 views
Skip to first unread message

Dwijadas Dey

unread,
Oct 22, 2019, 4:47:22 AM10/22/19
to mongodb-user

HI
 All
Trying to push events from Kafka topic to a MongoDB collection. I am using sink connector from https://github.com/mongodb/mongo-kafka. The properities file looks like following:

connect-distributed.properties
--------------------------------------------
bootstrap.servers=kfk-bro1:9092,kfk-bro2:9092,kfk-bro3:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
rest.host.name=kfkc-wkr2
rest.port=8085
rest.advertised.port=8085
plugin.path=/u01/cnfkc/confluent-kfk/share/java/kafka-connect-mongodb,/u01/cnfkc/confluent-kfk/share/java/kafka-connect-jdbc


IPTABLE/firewall is stopped across the cluster. Started the distributed connect process using the following command:

$  connect-distributed /u01/cnfkc/confluent-kfk/etc/kafka/connect-distributed.properties  & echo $! > /u01/cnfkc/confluent-kfk/etc/kafka/connect-distributed.pid &


I can grep and reach the REST port 8085

$  netstat  -pltn | grep 8085
tcp        0      0 ::ffff:172.16.10.158:8085   :::*                        LISTEN      7327/java

Verfied that the connector plugin has been loaded.

curl -s http://kfkc-wkr2:8085/connector-plugins | jq .
[
  {
    "class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "type": "sink",
    "version": "0.2"
  },
  {
    "class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "type": "source",
    "version": "0.2"
  },
...
...

Submitted a connector using the following REST call:

curl -X POST -H "Content-Type: application/json" -d ' {
    "name": "mongodb_sink",
    "config": {
               "connector.class" : "com.mongodb.kafka.connect.MongoSinkConnector",
               "topics" : "stream",
               "connection.uri" : "mongodb://mongo:27017",
               "database" : "Test",
               "collection" : "fromKafka",
               "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
               "key.converter.schemas.enable" : "false",
               "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
               "value.converter.schemas.enable" : "false"
            }
}' http:/kfkc-wkr2:8085/connectors | jq .


Once i submitted the above connector, i am getting the following error:

{
  "error_code": 500,
  "message": "IO Error trying to forward REST request: java.net.NoRouteToHostException: No route to host"
}

With the following stacks-trace:

java.util.concurrent.ExecutionException: java.net.NoRouteToHostException: No route to host
        at org.eclipse.jetty.client.util.FutureResponseListener.getResult(FutureResponseListener.java:118)
        at org.eclipse.jetty.client.util.FutureResponseListener.get(FutureResponseListener.java:101)
        at org.eclipse.jetty.client.HttpRequest.send(HttpRequest.java:683)
        at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:89)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.completeOrForwardRequest(ConnectorsResource.java:269)
        at org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:108)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

...
...
Caused by: java.net.NoRouteToHostException: No route to host
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at org.eclipse.jetty.io.SelectorManager.doFinishConnect(SelectorManager.java:355)
        at org.eclipse.jetty.io.ManagedSelector.processConnect(ManagedSelector.java:206)
        at org.eclipse.jetty.io.ManagedSelector.access$1400(ManagedSelector.java:60)

Can anyone point to me what could be the possible issue ? I can ping and reach REST port from any system in the cluster.

Dwijadas Dey

unread,
Oct 22, 2019, 5:20:59 AM10/22/19
to mongodb-user
Hi This is indeed an iptable issue. The error has gone away after totally stopping firewall but i am facing another issue of not finding the mongodb sink connector(com.mongodb.kafka.connect.MongoSinkConnector) class during run time even though i can list the same using curl -s http://kfkc-wkr2:8085/connector-plugins | jq .

{
  "error_code": 500,
  "message": "Failed to find any class that implements Connector and which name matches com.mongodb.kafka.connect.MongoSinkConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='5.2.1', encodedVersion=5.2.1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='5.2.1', encodedVersion=5.2.1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class io.debezium.connector.mongodb.MongoDbConnector, name='io.debezium.connector.mongodb.MongoDbConnector', version='0.10.0.Beta4', encodedVersion=0.10.0.Beta4, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='2.2.0-cp2', encodedVersion=2.2.0-cp2, type=source, typeName='source', location='classpath'}"
}

Dwijadas Dey

unread,
Oct 22, 2019, 9:11:35 AM10/22/19
to mongodb-user
Hi
I have corrected the error by installing the plugin in other nodes in the cluster. I was trying with configuring the monodb connector JAR in a single node. Once i configured the mongodb connector JAR in other nodes, the configuration which i posted earlier works flawlessly.

May be it help others


On Tuesday, October 22, 2019 at 2:17:22 PM UTC+5:30, Dwijadas Dey wrote:

Paddu Raj

unread,
Oct 23, 2019, 1:35:34 PM10/23/19
to mongodb-user
I have setup kafka consumer/prodcuer, Mongo sink plugins and kafkacat programs. Could you please help me steps thow to push data from mongodb to kafka topic. Thanks in advance. 
Reply all
Reply to author
Forward
0 new messages