Kafka Connect - Kafka Connect Mongodb Issues

1,565 views
Skip to first unread message

stam...@gmail.com

unread,
Jan 2, 2017, 11:33:14 AM1/2/17
to Confluent Platform
Hi,

I am trying to work with Kafka Connect and MongoDB sink connector. - https://github.com/DataReply/kafka-connect-mongodb

For start I tried to use it in standalone mode. (I have one topic, Kafka 0.9.0.0).
With Avro and Schema Registery it ran ok. And the first issue was when I tried to use it with Json(schema enabled = true):

I created a topic for Json and produce events with schema and payloads, when i ran the connector I was getting an error: 
JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields

My Json included only schema and payload, So i checked why it still causing that error and I found that in the JsonConverter for each record in the Kafka topic the toConnectData is called twice, the first time with null byte array and the second time with the value byte array. To solve it I created MyJsonConverter that inherit from JsonConverter and I added the following code:

override def toConnectData(topic: String, value: Array[Byte]): SchemaAndValue = {
    var jsonValue : JsonNode = null
    try {
      jsonValue = deserializer.deserialize(topic, value)
    } catch{
      case e: SerializationException =>  throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e)
    }

    if(jsonValue == null){
      return new SchemaAndValue(null,null)
    }

    super.toConnectData(topic, value)
  }

  
1. Why does it happen? how should I fix it?

The second issue is that in standalone it doesn't work fast enough so I tried to run the distributed mode and I have two issues:

2. This connector use 1 Task per Topic so the second worker does nothing 

3. In the distributed mode from time to time I see the following error message:
ERROR Commit of Thread[WorkerSinkTask-mongodb-sink-connector-0,5,main] offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:182)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
[2017-01-02 16:31:28,440] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@1373b9a5 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2017-01-02 16:31:28,448] ERROR Error ILLEGAL_GENERATION occurred while committing offsets for group connect-mongodb-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:544)
Anyone know why it happen?

Thanks,
Stam

Ewen Cheslack-Postava

unread,
Jan 2, 2017, 2:32:22 PM1/2/17
to Confluent Platform
1. Presumably the first call with null bytes is the key being converted. The JsonConverter with schemas enabled expects the envelope format to always be there, even for nulls. This is a limitation of having to ship the schema with the data itself -- we'd lose some schema info if we just stored the null value.

If you're not using keys at all, you could just use a different converter (even a simple dummy one you implement yourself that just returns null), although I'm not sure how the Mongo connector uses the keys. Alternatively, the Mongo connector *might* support schema-free data (i.e. use schemas.enable=false). I would not be surprised if it did given Mongo is a document database.

2. The framework can't do much if the connector doesn't provide any parallelism. However, I'm a bit surprised it only generates on task per topic. Sink connectors effectively handle parallelism automatically without any extra implementation support by the connector developer since they leverage Kafka's consumer group functionality. If the connector is not generating up to tasks.max tasks, it should be a trivial patch to make it do that.

3. This just means that we decided to try to commit at the same time a rebalance started. If you expect things to be in a steady state, I'd look into this more if it happens too frequently (once everything is started, rebalances should be pretty rare), but if it's not happening too frequently it is likely just a matter of bad timing.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a465cae8-2072-4c26-a4fc-146536842fa9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

stam...@gmail.com

unread,
Jan 2, 2017, 4:53:21 PM1/2/17
to Confluent Platform
Hi, Thanks for the answers!

1. I thought that the key is the problem, thanks.

2. this is the taskConfigs of the connector:
 @Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
       
List<Map<String, String>> configs = new ArrayList<>();
       
List<String> coll = Arrays.asList(collections.split(","));
       
int numGroups = Math.min(coll.size(), maxTasks);
       
List<List<String>> dbsGrouped = ConnectorUtils.groupPartitions(coll, numGroups);
       
List<String> topics = Arrays.asList(this.topics.split(","));
       
List<List<String>> topicsGrouped = ConnectorUtils.groupPartitions(topics, numGroups);

       
for (int i = 0; i < numGroups; i++) {
               
Map<String, String> config = new HashMap<>();
                config
.put(PORT, port);
                config
.put(BULK_SIZE, bulkSize);
                config
.put(HOST, host);
                config
.put(DATABASE, database);
                config
.put(COLLECTIONS, StringUtils.join(dbsGrouped.get(i), ","));
                config
.put(TOPICS, StringUtils.join(topicsGrouped.get(i), ","));
                configs
.add(config);
       
}
       
return configs;
}
   
collections is the collections in Mongo and each collection is working with one topic. so when i use one topic and one collection than it doesn't matter how many tasks i configured in the max.tasks. how can i modify it?

3. this issue happens a lot - something like one time in a minute.

Stam
Message has been deleted

stam...@gmail.com

unread,
Jan 3, 2017, 9:35:03 AM1/3/17
to Confluent Platform
More information about issue 3:

Before the  CommitFailedException, there are another info and error that I cant understand:

[2017-01-03 14:11:23,406] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@17c55be8 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2017-01-03 14:11:23,684] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@17c55be8 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2017-01-03 14:11:38,039] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@17c55be8 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
[2017-01-03 14:11:38,041] INFO Marking the coordinator 2147483645 dead. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)

[2017-01-03 14:11:38,055] ERROR Error ILLEGAL_GENERATION occurred while committing offsets for group connect-mongodb-sink-connector-json1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:544)
[2017-01-03 14:11:38,060] ERROR Commit of Thread[WorkerSinkTask-mongodb-sink-connector-json1-0,5,main] offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:101)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance


What are those errors?

when i'm looking in the Kafka manager I see the following:

It looks like the connect process does nothing with the connect-offsets topic but use the connect-configs topic. is it ok?

My distributed configuration:

bootstrap.servers=localhost:9092

group.id=ConnectTestGroup

key.converter=com.startapp.data.NullKeyJsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true


internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets


config.storage.topic=connect-configs


Thanks,
Stam

Ewen Cheslack-Postava

unread,
Jan 3, 2017, 3:52:57 PM1/3/17
to Confluent Platform
Re: the collections, if you only have 1 then it's likely you cannot parallelize this at all. The JDBC connector has a similar limitation -- a table is the finest granularity that you can split work up by. Depending on how the connector queries for new data, you'd probably have to modify it substantially to get finer-grained parallelism.

re: the commit error, it sounds like the consumer group is going through rebalances very frequently for some reason. Could you take a look at some metrics for the group to verify? The join-rate and and heartbeat-latency would probably both be interesting to look at. last-heartbeat-seconds-ago might also be useful.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

stam...@gmail.com

unread,
Jan 4, 2017, 4:38:18 AM1/4/17
to Confluent Platform
I'm not sure that I understand correctly, I'm using a sink connector so the connector gets new data from the Connect framework. I thought that limitation is only for source connectors. 
Am I missing something?

Stam

Ewen Cheslack-Postava

unread,
Jan 4, 2017, 5:46:20 PM1/4/17
to Confluent Platform
I was just looking at the taskConfigs() you included. It has this:

List<String> coll = Arrays.asList(collections.split(","));
int numGroups = Math.min(coll.size(), maxTasks);

I was  just saying that if you use that list of collections to generate the tasks and there is only one of them, the way the connector is currently written will limit the number of groups to 1.

You're right that this is normally only a limitation for source connectors (sink connectors normally can scale to however many partitions the input topics have), but this particular connector is generating tasks with a limitation based on the # of collections for some reason.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

stam...@gmail.com

unread,
Jan 8, 2017, 12:14:45 PM1/8/17
to Confluent Platform
Hey Ewen,

Thanks for all the answers and the information! I still have some issues.

I checked the SinkTask of the connector and it looks like its running for a long time for each put method call - (something like 10 seconds for each put call). I'm pretty sure that what cause the frequently rebalance issue. The bulk writing to mongo is what taking so long.
How can I make the put method get less records per call? What else can I do with it?

In addition, I modified the connector to run with more than one task, It looks good except one thing: when it execute on one worker it looks like it works fine, but when the second worker is starting (or when the two or more workers running the connector and i kill one of the workers the others workers might come up with the error as well), sometimes the first one is stopped with this error:

[2017-01-08 16:54:47,322] INFO Forcing shutdown of thread WorkerSinkTask-mongo-sink-connector-json-1 (org.apache.kafka.connect.util.ShutdownableThread:141)
[2017-01-08 16:54:47,323] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@3283cfd6 failed. (org.apache.kafka.connect.runtime.Worker:312)
[2017-01-08 16:54:47,324] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
        at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
        at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
        at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:144)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:266)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
        at java.lang.Thread.run(Thread.java:745)

Any idea why does it happens? 

Thanks again,
Stam

Ewen Cheslack-Postava

unread,
Jan 9, 2017, 3:04:36 AM1/9/17
to Confluent Platform
On Sun, Jan 8, 2017 at 9:14 AM, <stam...@gmail.com> wrote:
Hey Ewen,

Thanks for all the answers and the information! I still have some issues.

I checked the SinkTask of the connector and it looks like its running for a long time for each put method call - (something like 10 seconds for each put call). I'm pretty sure that what cause the frequently rebalance issue. The bulk writing to mongo is what taking so long.
How can I make the put method get less records per call? What else can I do with it?

You have a few options to alleviate this:

* You can override the maximum number of records returned by polling by putting consumer.max.poll.records=N in your worker configs, where N is a number that is small enough not to trigger the timeout, but large enough that you don't suffer from effectively synchronous message processing.
* Given the short timeout, I'm guessing you aren't. But if you're on 0.10.1.0 or newer, the consumer now has a background heartbeat thread. You can adjust consumer.max.poll.interval.ms to allow more time between consumer polling (while still heartbeating in the background).
* If you're on <= 0.10.0.x, you can increase consumer.session.timeout.ms. This will mean it will take longer to discover a hard failure (i.e. crash without handling the exception, power failure, etc), but will provide more time for processing.

All that said, 10s sounds awfully long for it to be processing a single put(). I'd check with the authors whether they are processing some messages synchronously.
 

In addition, I modified the connector to run with more than one task, It looks good except one thing: when it execute on one worker it looks like it works fine, but when the second worker is starting (or when the two or more workers running the connector and i kill one of the workers the others workers might come up with the error as well), sometimes the first one is stopped with this error:

[2017-01-08 16:54:47,322] INFO Forcing shutdown of thread WorkerSinkTask-mongo-sink-connector-json-1 (org.apache.kafka.connect.util.ShutdownableThread:141)
[2017-01-08 16:54:47,323] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@3283cfd6 failed. (org.apache.kafka.connect.runtime.Worker:312)
[2017-01-08 16:54:47,324] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
        at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
        at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
        at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
        at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:144)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:266)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
        at java.lang.Thread.run(Thread.java:745)

Any idea why does it happens? 

Multiple processes shouldn't cause this error (unless you were only running with standalone mode before). The more likely culprit is moving to multiple tasks per worker. 0.9.0.0 was the first release of Connect and it has since seen substantial refinement and stabilization, although I don't think we've seen this particular issue reported. It looks like the worker group is rebalancing and in the process it needs to shut down the sink tasks, so it goes to close the consumer. However, the main processing thread for the sink task is still doing some operation (presumably polling for data).

A bunch of stuff around how tasks are managed has been fixed & improved. From what I can tell this is not possible in current versions.

-Ewen
 

Thanks again,
Stam

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

stam...@gmail.com

unread,
Jan 9, 2017, 9:02:04 AM1/9/17
to Confluent Platform
I tried the consumer.max.poll.records=N configuration but it didn't effect the records number (maybe it's newer version configuration). 
I found consumer.max.partition.fetch.bytes=X configuration, after this configuration added to my workers it's working without errors (both errors solved)

Thanks alt for the help!

Stam

  


Birender Singh

unread,
Oct 6, 2017, 2:43:14 AM10/6/17
to Confluent Platform
Hi Ewen and Stam,
While trying mongo sink connector I'm getting the following error. The log is given below. I'm unable to find the solution please help me.

INFO ConnectorConfig values:
    connector.class = org.radarcns.mongodb.MongoDbSinkConnector
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    name = kafka-connector-mongodb-sink
    tasks.max = 1
    transforms = null
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-05 23:36:46,573] INFO EnrichedConnectorConfig values:
    connector.class = org.radarcns.mongodb.MongoDbSinkConnector
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    name = kafka-connector-mongodb-sink
    tasks.max = 1
    transforms = null
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-05 23:36:46,574] INFO TaskConfig values:
    task.class = class org.radarcns.mongodb.MongoDbSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-10-05 23:36:46,575] INFO Instantiated task kafka-connector-mongodb-sink-0 with version 0.11.0.0-cp1 of type org.radarcns.mongodb.MongoDbSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-10-05 23:36:46,602] INFO ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id =
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = connect-kafka-connector-mongodb-sink
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-10-05 23:36:46,658] INFO Kafka version : 0.11.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-10-05 23:36:46,659] INFO Kafka commitId : 5cadaa94d0a69e0d (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-10-05 23:36:46,661] INFO Created connector kafka-connector-mongodb-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-10-05 23:36:46,664] INFO 0 have been processed (org.radarcns.mongodb.MongoDbSinkTask:56)
[2017-10-05 23:36:46,665] ERROR Task kafka-connector-mongodb-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
java.lang.NoSuchMethodError: org.apache.kafka.common.config.AbstractConfig.<init>(Ljava/util/Map;)V
    at org.radarcns.mongodb.MongoDbSinkTask.start(MongoDbSinkTask.java:77)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2017-10-05 23:36:46,666] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:149)
[2017-10-05 23:36:46,666] INFO Stopping MongoDBSinkTask (org.radarcns.mongodb.MongoDbSinkTask:162)
[2017-10-05 23:36:46,666] INFO Stopped MongoDBSinkTask (org.radarcns.mongodb.MongoDbSinkTask:184)

Konstantine Karantasis

unread,
Oct 10, 2017, 1:10:44 PM10/10/17
to confluent...@googlegroups.com

Sounds like a classloading problem due to dependency conflicts. 

Are you using "plugin.path" or all your connectors are embedded in the CLASSPATH?
Also, make sure you are not bringing in with the connector any Apache Kafka Connect jars (that is any Connect framework classes).

Konstantine

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

To post to this group, send email to confluent-platform@googlegroups.com.

Birender Singh

unread,
Oct 11, 2017, 3:11:37 AM10/11/17
to Confluent Platform
Hi Konstantine,
Yes, I have embedded jars in the CLASSPATH. using export CLASSPATH=<path to the jar "kafka-connect-mongodb-sink-0.1.jar">

Birender

Konstantine Karantasis

unread,
Oct 11, 2017, 5:06:02 PM10/11/17
to confluent...@googlegroups.com

Exactly, 

I'd suggest using plugin isolation if possible, based on the Connect version you are running. 

More here: 

Konstantine

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages