Source offset 'file' parameter is missing

37 views
Skip to first unread message

Håkan Etzell

unread,
Jun 13, 2024, 4:29:02 AMJun 13
to debezium
Hi!

I'm trying to resolve a problem with one of my debezium connectors (2.5.4) that run in Kafka Connect (3.6.0). I run about 30 debezium connectors (to different mysql instances) and most of them run just fine all the time.

Occasionaly one connector fails to start with the error "Source offset 'file' parameter is missing". I have not yet figured out why. In my example below, I recreated the connector just one day before it failed.

My current hypothesis is that Kafka Connect servers restart due to preemption in our cluster (it's a dev-cluster), but that does not explain why just one (or two) connectors fail, but not the others.

Can someone point me in a direction on how to track this.
   / Håkan

org.apache.kafka.connect.errors.ConnectException: Source offset 'file' parameter is missing at io.debezium.connector.mysql.MySqlOffsetContext$Loader.load(MySqlOffsetContext.java:175) at io.debezium.connector.mysql.MySqlOffsetContext$Loader.load(MySqlOffsetContext.java:160) at io.debezium.connector.common.OffsetReader.lambda$offsets$0(OffsetReader.java:50) at java.base/java.util.Collections$SingletonSet.forEach(Collections.java:4905) at io.debezium.connector.common.OffsetReader.offsets(OffsetReader.java:45) at io.debezium.connector.common.BaseSourceTask.getPreviousOffsets(BaseSourceTask.java:382) at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:98) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:141) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:280) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)


Chris Cranford

unread,
Jun 13, 2024, 7:31:24 AMJun 13
to debe...@googlegroups.com
Hi -

I believe we're going to need a bit more information, such as your connector configuration.  Also what version of MySQL are you connecting to, and is this deployed on-prem, RDS, etc?

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/865232d3-8a75-4241-9d44-03d31781300fn%40googlegroups.com.

Håkan Etzell

unread,
Jun 14, 2024, 6:35:55 AMJun 14
to debe...@googlegroups.com
Debezium (2.5.4) connected to Google Cloud SQL for mySQL instance, Mysql 8.0.32. All of our other instances are on the same version.

Kafka and KafkaConnect are deployed in Google Kubernetes Engine using Strimzi. Kafka 3.6.0 and KafkaConnect 3.6.0. Strimzi 0.38.

Configuration (I've omitted a few things around transforms, let me know if that is important)
{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.exclude.list": ".*_data",
    "database.hostname": "${file:/opt/kafka/external-configuration/db-credentials/db-credentials.properties:host}",
    "database.password": "${file:/opt/kafka/external-configuration/db-credentials/db-credentials.properties:password}",
    "database.port": "3006",
    "database.server.id": "4142",
    "database.user": "${file:/opt/kafka/external-configuration/db-credentials/db-credentials.properties:username}",
    "include.schema.changes": "false",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enable": "true",
    "name": "dbz-source-finland-1-test2",
    "notification.enabled.channels": "sink",
    "notification.sink.topic.name": "_debezium_notification",
    "read.only": "true",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-bootstrap:9092",
    "schema.history.internal.kafka.topic": "schemahistory.finland-1-test2",
    "signal.enabled.channels": "kafka",
    "signal.kafka.bootstrap.servers": "kafka-bootstrap:9092",
    "signal.kafka.topic": "_debezium_signal",
    "snapshot.locking.mode": "none",
    "snapshot.mode": "schema_only",
    "table.include.list": ".+\\.account,... and about 30 more tables",
    "tasks.max": "1",
    "time.precision.mode": "connect",
    "topic.creation.default.cleanup.policy": "delete",
    "topic.creation.default.compression.type": "snappy",
    "topic.creation.default.partitions": "5",
    "topic.creation.default.replication.factor": "2",
    "topic.prefix": "finland-1",
    "transforms": "ChangesToHeader,Unwrap,Reroute,MapLicense,PartitionRouting",
    "transforms.ChangesToHeader.type": "io.debezium.transforms.ExtractChangedRecordState",
    "transforms.MapLicense.configFile": "/opt/kafka/external-configuration/mapping.properties",
    "transforms.MapLicense.failWhenMissingMap": "true",
    "transforms.MapLicense.fromFieldName": "__dbz__physicalTableIdentifier",
    "transforms.MapLicense.fromFieldValueRegexp": "^finland-(.*\\..*)\\..*",
    "transforms.MapLicense.targetFieldName": "license_id",
    "transforms.MapLicense.targetFieldRequired": "true",
    "transforms.MapLicense.type": "com.brpsystems.smt.MapLicenseId$Key",
    "transforms.PartitionRouting.fieldName": "__dbz__physicalTableIdentifier",
    "transforms.PartitionRouting.fieldSource": "key",
    "transforms.PartitionRouting.partitions": "5",
    "transforms.PartitionRouting.type": "com.brpsystems.smt.PartitionRouter",
    "transforms.Reroute.key.field.name": "__dbz__physicalTableIdentifier",
    "transforms.Reroute.topic.regex": "(.*)\\.(.*)",
    "transforms.Reroute.topic.replacement": "dbz-source.$2",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Unwrap.delete.handling.mode": "none",
    "transforms.Unwrap.drop.tombstones": "false",
    "transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.normalize.schemas": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "true"
}

jiri.p...@gmail.com

unread,
Jun 14, 2024, 6:37:25 AMJun 14
to debezium
Hi,

is it possible there was a change in GTID config on the MySQL server? Have you run an ad-hoc snapshot? Could you please share the contents of offset topic?

Jiri

Håkan Etzell

unread,
Jun 14, 2024, 9:56:35 AMJun 14
to debe...@googlegroups.com
Hi.

Below is output from kafkactl consume -b -k connect-cluster-offsets
# delimit Kafka Key and Value

The connector that fails
["dbz-source-finland-1-test2",{"server":"finland-1"}]#{}

An example for an another connector setup (which worked). Just for reference
["dbz-source-finland-2-test2",{"server":"finland-2"}]#{"transaction_id":null,"ts_sec":1718223061,"file":"mysql-bin.001218","pos":74152657,"incremental_snapshot_signal_offset":1662,"gtids":"a849623f-02f1-11eb-877d-42010a503009:1-96145630","row":1,"server_id":616639767,"event":2}

For me it seems quite clear why the first connector failed,  but I don't see how an empty json could be posted in the first place.

Do you think I should create a tombstone message for this key to reset it and then snapshot again would be a feasible resolution? 
    / Håkan




jiri.p...@gmail.com

unread,
Jun 18, 2024, 12:38:00 AMJun 18
to debezium
Hi,

yes I don't understand the empty offset too. Is not there any else before the empty one? WRT tombostone and connector reset - ye sthat's the solution.

Jiri

Reply all
Reply to author
Forward
0 new messages