SQL Server stops after 5 days of operation

504 views
Skip to first unread message

William Prigol Lopes

unread,
Apr 12, 2019, 11:02:28 AM4/12/19
to debezium
I've created a source connector with debezium v0.9.3 to SQL Server, but after 5 days of operation the distributed connector sent me this error:

[2019-04-12 14:32:35,833] INFO Skipping change ChangeTablePointer [changeTable=Capture instance "ZZZZ_ZZA010" [sourceTableId=PROTHEUS_PRODUCAO.dbo.ZZA010, changeTableId=PROTHEUS_PRODUCAO.cdc.ZZZZ_ZZA010_CT, startLsn=00001780:0000edea:0001, changeTableObjectId=823451945, stopLsn=NULL], resultSet=SQLServerResultSet:382, completed=false, currentChangePosition=00001f4e:0000009b:000d(00001f4e:0000009b:0002)] as its position is smaller than the last recorded position 00001f4e:0000009b:000d(00001f4e:0000009b:0002) (io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource:169)
[2019-04-12 14:32:35,834] INFO Skipping change ChangeTablePointer [changeTable=Capture instance "ZZZZ_ZZA010" [sourceTableId=PROTHEUS_PRODUCAO.dbo.ZZA010, changeTableId=PROTHEUS_PRODUCAO.cdc.ZZZZ_ZZA010_CT, startLsn=00001780:0000edea:0001, changeTableObjectId=823451945, stopLsn=NULL], resultSet=SQLServerResultSet:382, completed=false, currentChangePosition=00001f4e:0000009b:000d(00001f4e:0000009b:0002)] as its position is smaller than the last recorded position 00001f4e:0000009b:000d(00001f4e:0000009b:0002) (io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource:169)
[2019-04-12 14:32:45,788] INFO WorkerSourceTask{id=source-protheus-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2019-04-12 14:32:45,788] INFO WorkerSourceTask{id=source-protheus-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2019-04-12 14:32:49,781] ERROR Producer failure (io.debezium.pipeline.ErrorHandler:36)
java.lang.ArrayIndexOutOfBoundsException: 297
 at io
.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$2(TableSchemaBuilder.java:212)
 at io
.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143)
 at io
.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:65)
 at io
.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:40)
 at io
.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:125)
 at io
.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:203)
 at io
.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:493)
 at io
.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:143)
 at io
.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:137)
 at io
.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:86)
 at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java
.lang.Thread.run(Thread.java:748)
[2019-04-12 14:32:49,782] INFO Creating thread debezium-sqlserverconnector-protheus-error-handler (io.debezium.util.Threads:265)
[2019-04-12 14:32:49,782] ERROR Interrupted while stopping (io.debezium.connector.sqlserver.SqlServerConnectorTask:207)
java
.lang.InterruptedException
 at java
.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2067)
 at java
.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
 at java
.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:675)
 at io
.debezium.pipeline.ErrorHandler.stop(ErrorHandler.java:52)
 at io
.debezium.connector.sqlserver.SqlServerConnectorTask.cleanupResources(SqlServerConnectorTask.java:202)
 at io
.debezium.pipeline.ErrorHandler.lambda$setProducerThrowable$0(ErrorHandler.java:42)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java
.lang.Thread.run(Thread.java:748)
[2019-04-12 14:32:49,784] INFO [Producer clientId=protheus-dbhistory] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:1139)
[2019-04-12 14:32:50,165] INFO WorkerSourceTask{id=source-protheus-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2019-04-12 14:32:50,166] INFO WorkerSourceTask{id=source-protheus-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2019-04-12 14:32:50,166] ERROR WorkerSourceTask{id=source-protheus-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org
.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
 at io
.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
 at io
.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
 at io
.debezium.connector.sqlserver.SqlServerConnectorTask.poll(SqlServerConnectorTask.java:158)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
 at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java
.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 297
 at io
.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$2(TableSchemaBuilder.java:212)
 at io
.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:143)
 at io
.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:65)
 at io
.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:40)
 at io
.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:125)
 at io
.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:203)
 at io
.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:493)
 at io
.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:143)
 at io
.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:137)
 at io
.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:86)
 
... 5 more


I tried to upgrade to debezium v.0.9.4, but the error persists.

My ambient:
kafka 2.2.
source: debezium-sqlserver v0.9.4.

The configs:

zookeeper.properties:
dataDir=/home/william/kafka/data/zookeeper/
# the port at which the clients will connect
clientPort
=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns
=0

server.properties (I have 3 brokers, with the same configs, except for IPs and broker.id)
############################# Server Basics #############################


# The id of the broker. This must be set to a unique integer for each broker.
broker
.id=0


############################# Socket Server Settings #############################


# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners
=CONTROLLER://192.168.240.70:9091,INTERNAL://192.168.240.70:9092


# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised
.listeners=CONTROLLER://192.168.240.70:9091,INTERNAL://192.168.240.70:9092


# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener
.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT


control
.plane.listener.name=CONTROLLER
inter
.broker.listener.name=INTERNAL


# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num
.network.threads=3


# The number of threads that the server uses for processing requests, which may include disk I/O
num
.io.threads=8


# The send buffer (SO_SNDBUF) used by the socket server
socket
.send.buffer.bytes=102400


# The receive buffer (SO_RCVBUF) used by the socket server
socket
.receive.buffer.bytes=102400


# The maximum size of a request that the socket server will accept (protection against OOM)
socket
.request.max.bytes=104857600




############################# Log Basics #############################


# A comma separated list of directories under which to store log files
log
.dirs=/home/william/kafka/data/kafka/


# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num
.partitions=3


# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num
.recovery.threads.per.data.dir=1


############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets
.topic.replication.factor=3
transaction
.state.log.replication.factor=3
transaction
.state.log.min.isr=1


############################# Log Flush Policy #############################


# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.


# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000


# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000


############################# Log Retention Policy #############################


# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.


# The minimum age of a log file to be eligible for deletion due to age
log
.retention.hours=168


# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824


# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log
.segment.bytes=1073741824


# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log
.retention.check.interval.ms=300000


############################# Zookeeper #############################


# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper
.connect=192.168.240.70:2181


# Timeout in ms for connecting to zookeeper
zookeeper
.connection.timeout.ms=6000




############################# Group Coordinator Settings #############################


# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0


connect-distributed.properties:
bootstrap.servers=192.168.240.70:9092,192.168.240.71:9092,192.168.240.72:9092,192.168.240.73:9092


# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster


# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key
.converter=org.apache.kafka.connect.json.JsonConverter
value
.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key
.converter.schemas.enable=true
value
.converter.schemas.enable=true


# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset
.storage.topic=connect-offsets-1
offset
.storage.replication.factor=3
#offset.storage.partitions=25


# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config
.storage.topic=connect-configs-1
config
.storage.replication.factor=3


# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status
.storage.topic=connect-status-1
status
.storage.replication.factor=3
#status.storage.partitions=5


# Flush much faster than normal, which is useful for testing/debugging
offset
.flush.interval.ms=10000


# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083


# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=


# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin
.path=/home/william/kafka/connectors/


And, finally, my source config:
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database
.user=*****
database
.dbname=PROTHEUS_PRODUCAO
enable
.idempotence=true
tasks
.max=1
database
.history.kafka.bootstrap.servers=192.168.240.70:9092,192.168.240.71:9092,192.168.240.72:9092,192.168.240.73:9092
database
.history.kafka.topic=dbreplication.protheus
acks
=all
transforms
=route
database
.server.name=protheus
bootstrap
.servers=192.168.240.70:9092,192.168.240.71:9092,192.168.240.72:9092,192.168.240.73:9092
database
.port=1433
table
.whitelist=dbo.SQB010,dbo.AD6010,dbo.ACO010,dbo.ACP010,dbo.ACH010,dbo.ACY010,dbo.CC2010,dbo.CD2010,dbo.CTT010,dbo.DA1010,dbo.DAI010,dbo.DAK010,dbo.SA1010,dbo.SA2010,dbo.SA3010,dbo.SA4010,dbo.SA7010,dbo.SAH010,dbo.SB1010,dbo.SB2010,dbo.SB3010,dbo.SB4010,dbo.SB6010,dbo.SB7010,dbo.SB8010,dbo.SB9010,dbo.SBE010,dbo.SBF010,dbo.SBM010,dbo.SC1010,dbo.SC2010,dbo.SC5010,dbo.SC6010,dbo.SC7010,dbo.SC9010,dbo.SCP010,dbo.SCT010,dbo.SD1010,dbo.SD2010,dbo.SD3010,dbo.SDA010,dbo.SDB010,dbo.SDC010,dbo.SDD010,dbo.SE1010,dbo.SE2010,dbo.SE3010,dbo.SE4010,dbo.SE5010,dbo.SED010,dbo.SF1010,dbo.SF2010,dbo.SF3010,dbo.SF4010,dbo.SF5010,dbo.SG1010,dbo.SJ0010,dbo.SP3010,dbo.SPC010,dbo.SPH010,dbo.SRA010,dbo.SRD010,dbo.SRJ010,dbo.SRV010,dbo.SU5010,dbo.SU7010,dbo.SUA010,dbo.SUB010,dbo.SW0010,dbo.SW1010,dbo.SW2010,dbo.SW3010,dbo.SW4010,dbo.SW5010,dbo.SW6010,dbo.SW7010,dbo.SW8010,dbo.SW9010,dbo.SWA010,dbo.SWB010,dbo.SWD010,dbo.SWP010,dbo.SX5010,dbo.SY4010,dbo.SY6010,dbo.SYA010,dbo.SYB010,dbo.SYD010,dbo.SYE010,dbo.SYP010,dbo.SYQ010,dbo.SZ1010,dbo.SZ2010,dbo.SZ3010,dbo.SZ4010,dbo.SZ9010,dbo.SZB010,dbo.SZC010,dbo.SZI010,dbo.SZJ010,dbo.SZK010,dbo.SZN010,dbo.ZB1010,dbo.ZB2010,dbo.ZB3010,dbo.ZB5010,dbo.ZB6010,dbo.ZP0010,dbo.ZP1010,dbo.ZP2010,dbo.ZP4010,dbo.ZZC010,dbo.ZZF010,dbo.ZZG010,dbo.ZZH010,dbo.SPED050,dbo.SPED054,dbo.SP9010,dbo.ZZA010
transforms
.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms
.route.regex=([^.]+)\.([^.]+)\.([^.]+)
database
.hostname=192.168.240.244
database
.password=*******
transforms
.route.replacement=$3

I tried to remove the "ZZA010" table from source connector, as we see a message before the error on first log, but, the error persists and connector stops in the exactly same way.

Jiri Pechanec

unread,
Apr 18, 2019, 2:40:49 AM4/18/19
to debezium
Hi,

was there any change in database schema?

J.

William Prigol Lopes

unread,
May 6, 2019, 10:05:57 AM5/6/19
to debezium
Hi, Jiri,

No, Just normal operations (insert/update/delete) on source.

I'm updated the debezium from version 0.9.3 to version 0.9.4. Today, the service running for three weeks perfectly.

Thanks,
Reply all
Reply to author
Forward
0 new messages