database.history.kafka.topic=mysql-dev.da_testing_db.testcdc
when I list the topic, i see the following:
bin/kafka-topics.sh --list --zookeeper localhost:2181
mysql-dev-01
mysql-dev-01.da_testing_db.testcdc
__consumer_offsets
schema-changes.da_testing_db
testcdc
when I run bin/connect-standalone.sh config/connect-standalone.properties config/connect-mysql.properties, I can see millions of the following lines:
[2018-09-27 09:22:58,078] WARN Skipping invalid database history record '{
"schema" : {
"type" : "struct",
"fields" : [ {
"type" : "struct",
"fields" : [ {
"type" : "int64",
"optional" : false,
"field" : "id"
}, {
"type" : "int64",
"optional" : true,
"field" : "parent_id"
}, {
"type" : "int16",
"optional" : true,
"field" : "operator_id"
}, {
"type" : "int32",
"optional" : true,
"field" : "player_id"
}, {
"type" : "int16",
"optional" : true,
"field" : "game_id"
}, {
"type" : "int16",
"optional" : true,
"field" : "platform"
}, {
"type" : "string",
"optional" : true,
"field" : "currency"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "pay_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "6",
"connect.decimal.precision" : "13"
},
"field" : "sales_contribution_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "sales_rtp_contribution_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "promotion_amount"
}, {
"type" : "int16",
"optional" : true,
"field" : "bet_status"
}, {
"type" : "int16",
"optional" : true,
"field" : "bet_type"
}, {
"type" : "int16",
"optional" : true,
"field" : "transaction_type"
}, {
"type" : "int16",
"optional" : true,
"field" : "game_stage"
}, {
"type" : "int64",
"optional" : true,
"name" : "io.debezium.time.MicroTimestamp",
"version" : 1,
"field" : "create_time"
}, {
"type" : "int64",
"optional" : true,
"name" : "io.debezium.time.MicroTimestamp",
"version" : 1,
"field" : "update_time"
}, {
"type" : "boolean",
"optional" : true,
"field" : "is_deduct"
} ],
"optional" : true,
"name" : "mysql_dev_01.da_testing_db.testcdc.Value",
"field" : "before"
}, {
"type" : "struct",
"fields" : [ {
"type" : "int64",
"optional" : false,
"field" : "id"
}, {
"type" : "int64",
"optional" : true,
"field" : "parent_sales_id"
}, {
"type" : "int16",
"optional" : true,
"field" : "operator_id"
}, {
"type" : "int32",
"optional" : true,
"field" : "customer_id"
}, {
"type" : "int16",
"optional" : true,
"field" : "product_id"
}, {
"type" : "int16",
"optional" : true,
"field" : "platform"
}, {
"type" : "string",
"optional" : true,
"field" : "currency"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "sales_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "profit_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "6",
"connect.decimal.precision" : "13"
},
"field" : "sales_contribution_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "sales_rtp_contribution_amount"
}, {
"type" : "bytes",
"optional" : true,
"name" : "org.apache.kafka.connect.data.Decimal",
"version" : 1,
"parameters" : {
"scale" : "2",
"connect.decimal.precision" : "18"
},
"field" : "sales_profit_amount"
}, {
"type" : "int16",
"optional" : true,
"field" : "sales_status"
}, {
"type" : "int16",
"optional" : true,
"field" : "product_type"
}, {
"type" : "int16",
"optional" : true,
"field" : "transaction_type"
}, {
"type" : "int16",
"optional" : true,
"field" : "sales_stage"
}, {
"type" : "int64",
"optional" : true,
"name" : "io.debezium.time.MicroTimestamp",
"version" : 1,
"field" : "create_time"
}, {
"type" : "int64",
"optional" : true,
"name" : "io.debezium.time.MicroTimestamp",
"version" : 1,
"field" : "update_time"
}, {
"type" : "boolean",
"optional" : true,
"field" : "is_deduct"
} ],
"optional" : true,
"name" : "mysql_dev_01.da_testing_db.testcdc.Value",
"field" : "after"
}, {
"type" : "struct",
"fields" : [ {
"type" : "string",
"optional" : true,
"field" : "version"
}, {
"type" : "string",
"optional" : false,
"field" : "name"
}, {
"type" : "int64",
"optional" : false,
"field" : "server_id"
}, {
"type" : "int64",
"optional" : false,
"field" : "ts_sec"
}, {
"type" : "string",
"optional" : true,
"field" : "gtid"
}, {
"type" : "string",
"optional" : false,
"field" : "file"
}, {
"type" : "int64",
"optional" : false,
"field" : "pos"
}, {
"type" : "int32",
"optional" : false,
"field" : "row"
}, {
"type" : "boolean",
"optional" : true,
"default" : false,
"field" : "snapshot"
}, {
"type" : "int64",
"optional" : true,
"field" : "thread"
}, {
"type" : "string",
"optional" : true,
"field" : "db"
}, {
"type" : "string",
"optional" : true,
"field" : "table"
}, {
"type" : "string",
"optional" : true,
"field" : "query"
} ],
"optional" : false,
"name" : "io.debezium.connector.mysql.Source",
"field" : "source"
}, {
"type" : "string",
"optional" : false,
"field" : "op"
}, {
"type" : "int64",
"optional" : true,
"field" : "ts_ms"
} ],
"optional" : false,
"name" : "mysql_dev_01.da_testing_db.testcdc.Envelope"
},
"payload" : {
"before" : null,
"after" : {
"id" : 0,
"parent_id" : 2,
"operator_id" : 1,
"player_id" : 40010,
"game_id" : 17,
"platform" : 1,
"currency" : "EUR",
"sales_amount" : "ANbY",
"profit_amount" : "dTA=",
"sales_contribution_amount" : "AA==",
"sales_rtp_contribution_amount" : "AA==",
"sales_profit_amount" : "AA==",
"sales_status" : 3,
"product_type" : 1,
"transaction_type" : 1,
"sales_stage" : 0,
"create_time" : null,
"update_time" : null,
"is_deduct" : true
},
"source" : {
"version" : "0.9.0-SNAPSHOT",
"name" : "MYSQL-DEV-01",
"server_id" : 0,
"ts_sec" : 0,
"gtid" : null,
"file" : "mysql-bin.000228",
"pos" : 669870889,
"row" : 0,
"snapshot" : true,
"thread" : null,
"db" : "da_testing_db",
"table" : "testcdc",
"query" : null
},
"op" : "c",
"ts_ms" : 1537954045005
}
}'. This is often not an issue, but if it happens repeatedly please check the 'mysql-dev-01.da_testing_db.testcdc' topic. (io.debezium.relational.history.KafkaDatabaseHistory:234)
I thought I should get those information on my consumer, but it is not, and when I pump data into my mysql server, I did not see any real time streaming coming through as well.
Ronnie10