Debezium Server - GCP pub/sub - MSSQL

991 views
Skip to first unread message

muthu selvam

unread,
Sep 12, 2022, 3:43:26 AM9/12/22
to debezium
Hi,

I am using Debezium server to input the sql server 2017 data into the GCP pub/sub and reading that data using a subscriber. When i try to add the message filter it is not filtering the messages based on the condition. I have added my application.properties file data.

debezium.sink.type=pubsub
debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
debezium.source.offset.storage.file.filename=offsets-new.dat
debezium.source.database.hostname=az.database.windows.net
debezium.source.database.port=1433
debezium.source.database.user= 
debezium.source.database.password= 
debezium.source.database.dbname=test
debezium.source.table.include.list=dbo.test
debezium.source.database.history.file.filename=FileDatabaseHistory.dat
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.source.time.precision.mode=connect

debezium.source.transforms=filter
debezium.source.transforms.filter.type=io.debezium.transforms.Filter
debezium.source.transforms.filter.language=jsr223.groovy
debezium.source.transforms.filter.condition="value.op == 'u'"
debezium.source.transforms.filter.topic.regex=test_topic

quarkus.log.console.json=false
debezium.source.include.schema.changes=false

jiri.p...@gmail.com

unread,
Sep 12, 2022, 4:42:27 AM9/12/22
to debezium
Hi,

which Debezium version do you use? Could you please share the log too?

J.

muthu selvam

unread,
Sep 12, 2022, 5:54:23 AM9/12/22
to debezium
Hi, I am using version 1.9.5

jiri.p...@gmail.com

unread,
Sep 12, 2022, 6:02:30 AM9/12/22
to debezium
Could you please provide the log?

J.

muthu selvam

unread,
Sep 12, 2022, 6:08:23 AM9/12/22
to debezium
Please find the logs.

__  ____  __  _____   ___  __ ____  ______
 --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
 -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \  
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/  
2022-09-12 14:34:48,786 INFO  [io.deb.ser.BaseChangeConsumer] (main) Using 'io.debezium.server.BaseChangeConsumer$$Lambda$211/0x0000000800308040@795fd838' stream name mapper
2022-09-12 14:34:48,970 INFO  [io.deb.ser.pub.PubSubChangeConsumer] (main) Using default PublisherBuilder 'io.debezium.server.pubsub.PubSubChangeConsumer$$Lambda$221/0x000000080030d040@58e85c6f'
2022-09-12 14:34:48,970 INFO  [io.deb.ser.DebeziumServer] (main) Consumer 'io.debezium.server.pubsub.PubSubChangeConsumer' instantiated
2022-09-12 14:34:49,048 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = true

2022-09-12 14:34:49,051 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2022-09-12 14:34:49,057 INFO  [io.deb.tra.Filter] (main) Using language 'jsr223.groovy' to evaluate expression '"value.op == 'u'"'
2022-09-12 14:34:49,683 INFO  [io.deb.emb.EmbeddedEngine$EmbeddedConfig] (main) EmbeddedConfig values:
    access.control.allow.methods =
    access.control.allow.origin =
    admin.listeners = null
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = use_all_dns_ips
    config.providers = []
    connector.client.config.override.policy = All
    header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    listeners = [http://:8083]
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    offset.flush.interval.ms = 10000
    offset.flush.timeout.ms = 60000
    offset.storage.file.filename = offsets-new.dat
    offset.storage.partitions = null
    offset.storage.replication.factor = null
    offset.storage.topic =
    plugin.path = null
    response.http.headers.config =
    rest.advertised.host.name = null
    rest.advertised.listener = null
    rest.advertised.port = null
    rest.extension.classes = []
    ssl.cipher.suites = null
    ssl.client.auth = none
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    task.shutdown.graceful.timeout.ms = 5000
    topic.creation.enable = true
    topic.tracking.allow.reset = true
    topic.tracking.enable = true
    value.converter = class org.apache.kafka.connect.json.JsonConverter

2022-09-12 14:34:49,683 WARN  [org.apa.kaf.con.run.WorkerConfig] (main) The worker has been configured with one or more internal converter properties ([internal.key.converter, internal.value.converter]). Support for these properties was deprecated in version 2.0 and removed in version 3.0, and specifying them will have no effect. Instead, an instance of the JsonConverter with schemas.enable set to false will be used. For more information, please visit http://kafka.apache.org/documentation/#upgrade and consult the upgrade notesfor the 3.0 release.
2022-09-12 14:34:49,684 WARN  [org.apa.kaf.con.run.WorkerConfig] (main) Variables cannot be used in the 'plugin.path' property, since the property is used by plugin scanning before the config providers that replace the variables are initialized. The raw value 'null' was used for plugin scanning, as opposed to the transformed value 'null', and this may cause unexpected results.
2022-09-12 14:34:49,685 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = key
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2022-09-12 14:34:49,685 INFO  [org.apa.kaf.con.jso.JsonConverterConfig] (main) JsonConverterConfig values:
    converter.type = value
    decimal.format = BASE64
    schemas.cache.size = 1000
    schemas.enable = false

2022-09-12 14:34:49,687 INFO  [io.deb.ser.DebeziumServer] (main) Engine executor started
2022-09-12 14:34:49,694 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-7-thread-1) Starting FileOffsetBackingStore with file offsets-new.dat
2022-09-12 14:34:49,832 INFO  [io.quarkus] (main) debezium-server-dist 1.9.5.Final on JVM (powered by Quarkus 2.7.2.Final) started in 7.364s. Listening on: http://0.0.0.0:8080
2022-09-12 14:34:49,834 INFO  [io.quarkus] (main) Profile prod activated.
2022-09-12 14:34:49,834 INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, resteasy-jackson, smallrye-context-propagation, smallrye-health, vertx]
2022-09-12 14:34:50,775 INFO  [io.deb.jdb.JdbcConnection] (pool-9-thread-1) Connection gracefully closed
2022-09-12 14:34:50,785 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Starting SqlServerConnectorTask with configuration:
2022-09-12 14:34:50,786 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    connector.class = io.debezium.connector.sqlserver.SqlServerConnector
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    transforms = filter
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    include.schema.changes = false
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    tombstones.on.delete = false
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    offset.storage.file.filename = offsets-new.dat
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    debezium.sink.pubsub.project.id =
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    value.converter = org.apache.kafka.connect.json.JsonConverter
2022-09-12 14:34:50,787 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    key.converter = org.apache.kafka.connect.json.JsonConverter
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.history.file.filename = FileDatabaseHistory.dat
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.user =
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.dbname = resellermobqa
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    transforms.filter.topic.regex = test_topic
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    debezium.sink.type = pubsub
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    offset.storage = org.apache.kafka.connect.storage.FileOffsetBackingStore
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    time.precision.mode = connect
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    offset.flush.timeout.ms = 60000
2022-09-12 14:34:50,788 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.server.name =
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    transforms.filter.language = jsr223.groovy
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    transforms.filter.type = io.debezium.transforms.Filter
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.port = 1433
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    offset.flush.interval.ms = 10000
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    key.converter.schemas.enable = false
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    internal.key.converter = org.apache.kafka.connect.json.JsonConverter
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    transforms.filter.condition = "value.op == 'u'"
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.hostname = testserver
2022-09-12 14:34:50,789 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.password = ********
2022-09-12 14:34:50,790 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    value.converter.schemas.enable = false
2022-09-12 14:34:50,790 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    name = pubsub
2022-09-12 14:34:50,790 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    internal.value.converter = org.apache.kafka.connect.json.JsonConverter
2022-09-12 14:34:50,790 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    table.include.list = dbo.test
2022-09-12 14:34:50,790 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1)    database.history = io.debezium.relational.history.FileDatabaseHistory
2022-09-12 14:34:50,930 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Found previous partition offset SqlServerPartition [sourcePartition={server=testserver}]: {transaction_id=null, event_serial_no=1, commit_lsn=0000008a:00006748:0051, change_lsn=0000008a:00006748:001d}
2022-09-12 14:34:50,931 INFO  [io.deb.rel.his.DatabaseHistoryMetrics] (pool-7-thread-1) Started database history recovery
2022-09-12 14:34:50,962 INFO  [io.deb.rel.his.DatabaseHistoryMetrics] (pool-7-thread-1) Finished database history recovery of 5 change(s) in 30 ms
2022-09-12 14:34:51,506 WARN  [io.deb.con.sql.SqlServerDefaultValueConverter] (pool-7-thread-1) Cannot parse column default value '(CONVERT([datetimeoffset](7),sysutcdatetime(),(0)))' to type 'datetimeoffset'. Expression evaluation is not supported.
2022-09-12 14:34:51,564 WARN  [io.deb.con.sql.SqlServerDefaultValueConverter] (pool-7-thread-1) Cannot parse column default value '(CONVERT([datetimeoffset](7),sysutcdatetime(),(0)))' to type 'datetimeoffset'. Expression evaluation is not supported.
2022-09-12 14:34:51,629 WARN  [io.deb.con.sql.SqlServerDefaultValueConverter] (pool-7-thread-1) Cannot parse column default value '(CONVERT([datetimeoffset](7),sysutcdatetime(),(0)))' to type 'datetimeoffset'. Expression evaluation is not supported.
2022-09-12 14:34:51,687 WARN  [io.deb.con.sql.SqlServerDefaultValueConverter] (pool-7-thread-1) Cannot parse column default value '(CONVERT([datetimeoffset](7),sysutcdatetime(),(0)))' to type 'datetimeoffset'. Expression evaluation is not supported.
2022-09-12 14:34:51,703 INFO  [io.deb.uti.Threads] (pool-7-thread-1) Requested thread factory for connector SqlServerConnector, id = testserver named = change-event-source-coordinator
2022-09-12 14:34:51,708 INFO  [io.deb.uti.Threads] (pool-7-thread-1) Creating thread debezium-sqlserverconnector-testserver-change-event-source-coordinator
2022-09-12 14:34:51,713 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) Metrics registered
2022-09-12 14:34:51,714 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) Context created
2022-09-12 14:34:51,720 INFO  [io.deb.con.sql.SqlServerSnapshotChangeEventSource] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.
2022-09-12 14:34:51,721 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) Snapshot ended with SnapshotResult [status=SKIPPED, offset=SqlServerOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.sqlserver.Source:STRUCT}, sourceInfo=SourceInfo [serverName=testserver, changeLsn=0000008a:00006748:001d, commitLsn=0000008a:00006748:0051, eventSerialNo=null, snapshot=FALSE, sourceTime=null], snapshotCompleted=false, eventSerialNo=1]]
2022-09-12 14:34:51,724 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) Connected metrics set to 'true'
2022-09-12 14:34:51,725 INFO  [io.deb.con.sql.SqlServerChangeEventSourceCoordinator] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) Starting streaming
2022-09-12 14:34:51,727 INFO  [io.deb.con.sql.SqlServerStreamingChangeEventSource] (debezium-sqlserverconnector-testserver-change-event-source-coordinator) Last position recorded in offsets is 0000008a:00006748:0051(0000008a:00006748:001d)[1]

jiri.p...@gmail.com

unread,
Sep 12, 2022, 6:18:39 AM9/12/22
to debezium
I think this line - debezium.source.transforms.filter.topic.regex=test_topic is wrong as it would not match any topic name to which you want to apply the SMT

J.

muthu selvam

unread,
Sep 12, 2022, 6:38:45 AM9/12/22
to debezium
No, the topic name was correct.

jiri.p...@gmail.com

unread,
Sep 12, 2022, 6:42:40 AM9/12/22
to debezium
Looking at debezium.source.table.include.list=dbo.test I'd expect that the table name and hence the topic name will be `test` or more precisely the suffix will be test. And this definitely does not conform to the test_topic to which you want to apply the SMT.

J.

muthu selvam

unread,
Sep 22, 2022, 8:14:38 AM9/22/22
to debezium
Thank you. I have updated the topic name. It's working as expected.

George H

unread,
Oct 11, 2022, 10:35:07 AM10/11/22
to debezium
Hi there,

My name is George form a recruitment consultancy in the UK.

We are recruiting for a Debezium Consultant for a remote working contract.

If you are interested or know anyone who has excellent Debezium experience please mail geo...@digisourced.com and rebecca...@recann.co.uk
Reply all
Reply to author
Forward
0 new messages