JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields
override def toConnectData(topic: String, value: Array[Byte]): SchemaAndValue = { var jsonValue : JsonNode = null try { jsonValue = deserializer.deserialize(topic, value) } catch{ case e: SerializationException => throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e) }
if(jsonValue == null){ return new SchemaAndValue(null,null) }
super.toConnectData(topic, value) }ERROR Commit of Thread[WorkerSinkTask-mongodb-sink-connector-0,5,main] offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:101)org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:182) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:857) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:829) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:171) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)[2017-01-02 16:31:28,440] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@1373b9a5 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)[2017-01-02 16:31:28,448] ERROR Error ILLEGAL_GENERATION occurred while committing offsets for group connect-mongodb-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:544)--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a465cae8-2072-4c26-a4fc-146536842fa9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
List<String> coll = Arrays.asList(collections.split(","));
int numGroups = Math.min(coll.size(), maxTasks);
List<List<String>> dbsGrouped = ConnectorUtils.groupPartitions(coll, numGroups);
List<String> topics = Arrays.asList(this.topics.split(","));
List<List<String>> topicsGrouped = ConnectorUtils.groupPartitions(topics, numGroups);
for (int i = 0; i < numGroups; i++) {
Map<String, String> config = new HashMap<>();
config.put(PORT, port);
config.put(BULK_SIZE, bulkSize);
config.put(HOST, host);
config.put(DATABASE, database);
config.put(COLLECTIONS, StringUtils.join(dbsGrouped.get(i), ","));
config.put(TOPICS, StringUtils.join(topicsGrouped.get(i), ","));
configs.add(config);
}
return configs;
}
[2017-01-03 14:11:23,406] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@17c55be8 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)[2017-01-03 14:11:23,684] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@17c55be8 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)[2017-01-03 14:11:38,039] INFO org.apache.kafka.connect.runtime.WorkerSinkTask@17c55be8 Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)[2017-01-03 14:11:38,041] INFO Marking the coordinator 2147483645 dead. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2017-01-03 14:11:38,055] ERROR Error ILLEGAL_GENERATION occurred while committing offsets for group connect-mongodb-sink-connector-json1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:544)[2017-01-03 14:11:38,060] ERROR Commit of Thread[WorkerSinkTask-mongodb-sink-connector-json1-0,5,main] offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:101)org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalancebootstrap.servers=localhost:9092
group.id=ConnectTestGroup
key.converter=com.startapp.data.NullKeyJsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=truevalue.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.flush.interval.ms=10000
config.storage.topic=connect-configs
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7f1b58aa-8e23-4b75-ae14-7e14e7653810%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/458ca41d-2f4e-4142-ba47-8eadad5915db%40googlegroups.com.
[2017-01-08 16:54:47,322] INFO Forcing shutdown of thread WorkerSinkTask-mongo-sink-connector-json-1 (org.apache.kafka.connect.util.ShutdownableThread:141)[2017-01-08 16:54:47,323] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@3283cfd6 failed. (org.apache.kafka.connect.runtime.Worker:312)[2017-01-08 16:54:47,324] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213) at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128) at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898) at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209) at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:144) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:266) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) at java.lang.Thread.run(Thread.java:745)Hey Ewen,Thanks for all the answers and the information! I still have some issues.I checked the SinkTask of the connector and it looks like its running for a long time for each put method call - (something like 10 seconds for each put call). I'm pretty sure that what cause the frequently rebalance issue. The bulk writing to mongo is what taking so long.How can I make the put method get less records per call? What else can I do with it?
In addition, I modified the connector to run with more than one task, It looks good except one thing: when it execute on one worker it looks like it works fine, but when the second worker is starting (or when the two or more workers running the connector and i kill one of the workers the others workers might come up with the error as well), sometimes the first one is stopped with this error:[2017-01-08 16:54:47,322] INFO Forcing shutdown of thread WorkerSinkTask-mongo-sink-connector-json-1 (org.apache.kafka.connect.util.ShutdownableThread:141)[2017-01-08 16:54:47,323] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@3283cfd6 failed. (org.apache.kafka.connect.runtime.Worker:312)[2017-01-08 16:54:47,324] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded accessat org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.poll(WorkerGroupMember.java:144)at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:266)at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)at java.lang.Thread.run(Thread.java:745)Any idea why does it happens?
Thanks again,Stam
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/d827c03b-4281-41a8-8fcb-c36732fd2191%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/1628d616-fd2a-4051-887e-5afb18afede1%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/7b9a4f16-7678-4d3d-a820-98daa80e5633%40googlegroups.com.