PostgreSQL connector: BigDecimal has mismatching scale value for given Decimal schema error.

664 views
Skip to first unread message

Takahiro Hozumi

unread,
Aug 2, 2017, 5:35:32 AM8/2/17
to debezium
Hi,

I am trying to use PostgreSQL connector.
While several tables were successfully exported to Kafka, I got following error.

[2017-08-02 08:14:19,815] ERROR Task mydebezium-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org
.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
 at org
.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:69)
 at io
.confluent.connect.avro.AvroData$5.convert(AvroData.java:237)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:341)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:487)
 at io
.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
 at io
.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:197)
 at org
.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
 at org
.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
 at org
.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
 at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java
.lang.Thread.run(Thread.java:745)
[2017-08-02 08:14:19,816] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2017-08-02 08:14:19,816] INFO Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)


Debezium version: 0.5.2-20170802.075935-7
Kafka-Connect: confluentinc/cp-kafka-connect:3.2.2-1 (docker)
Kafka: confluentinc/cp-kafka:3.2.0 (docker)
Schema registry: confluentinc/cp-schema-registry:3.2.0 (docker)
PostgreSQL: 9.6.3

debezium connector config:
{
       
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       
"database.dbname": "mydb_staging",
       
"database.hostname": "mydb-test-db-1",
       
"database.password": "mypass",
       
"database.port": "6543",
       
"database.server.name": "mydb",
       
"database.user": "postgres",
       
"key.converter": "io.confluent.connect.avro.AvroConverter",
       
"key.converter.schema.registry.url": "http://my_schema_registry:8081",
       
"value.converter": "io.confluent.connect.avro.AvroConverter",
       
"value.converter.schema.registry.url": "http://ssp-cmn-kafka2.scaleout.local:8081"
   
}
}


I thought the issue described at https://issues.jboss.org/browse/DBZ-287 might be related, but the error still happen in 0.5.2-SNAPSHOT which solved DBZ-287.
Unfortunately, I couldn't identify actual table/record that cause this error, because target database have many tables.
Can you please let me know if I have missed out something else? I appreciate your help.

Thanks,
Takahiro.

Gunnar Morling

unread,
Aug 2, 2017, 6:02:41 AM8/2/17
to debe...@googlegroups.com
Hi,

Hum, that's weird, indeed it shouldn't occur any longer with the latest snapshot. Could you try and enable logging (as per http://debezium.io/docs/logging/) to find out what the columns/values this is about?

--Gunnar


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.
To post to this group, send email to debe...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/7bf1dcb6-faaf-40a6-9a2b-8b904e86ead4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Takahiro Hozumi

unread,
Aug 3, 2017, 5:04:50 AM8/3/17
to debezium
Thank you for telling me about logging.
I changed root logging level to DEBUG, but cannot identify which columns/values cause this error.
Is a SourceRecord logged at last most likely to do with the issue?

[2017-08-02 13:31:12,656] DEBUG Placed source record 'SourceRecord{sourcePartition={server=mydb}, sourceOffset={last_snapshot_record=true, lsn=179752223304, txId=746601, ts_usec=1501680672654000, snapshot=true}} ConnectRecord{topic='mydb.public.exchange_rates', kafkaPartition=null, key=Struct{id=24}, value=Struct{after=Struct{id=24,code=USD,date=17379,rate=110.350,created_at=1501481515082000000,updated_at=1501481515082000000},source=Struct{name=mydb,ts_usec=1501680672654000,txId=746601,lsn=179752223304,snapshot=true,last_snapshot_record=false},op=r,ts_ms=1501680672654}, timestamp=null}' into queue (io.debezium.connector.postgresql.PostgresConnectorTask)

I will try to find table and value by binary search with whitelist, but it takes some time.

Thanks,
Takahiro
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

Takahiro Hozumi

unread,
Aug 3, 2017, 8:37:41 AM8/3/17
to debezium
Hi,

I found the issue occurred with the following table:

# \d mytable
                                         
Table "public.mytable"
       
Column       |            Type             |                        Modifiers
--------------------+-----------------------------+---------------------------------------------------------
 id                
| integer                     | not null default nextval('mytable_id_seq'::regclass)
 name              
| character varying(255)      | not null
 key                
| character varying(255)      | not null
 price              
| numeric                     |
 select_p      
| integer                     | default 2
 myfield1
| integer                     | not null
 created_at        
| timestamp without time zone |
 updated_at        
| timestamp without time zone |
Indexes:
   
"mytable_pkey" PRIMARY KEY, btree (id)
   
"index_mytable_on_myfield1" UNIQUE, btree (myfield1)



And sample of logging data:

[2017-08-03 11:34:27,038] DEBUG Mapped primary key for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}]} (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,038] DEBUG Mapped columns for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "key", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "price", "index" : "3", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "select_p", "index" : "4", "schema" : {"type" : "INT32", "optional" : "true"}}, {"name" : "myfield1", "index" : "5", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "created_at", "index" : "6", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}, {"name" : "updated_at", "index" : "7", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}]} (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,049] INFO Step 2: locking each of the database tables, waiting a maximum of '10.0' seconds for each lock (io.debezium.connector.postgresql.RecordsSnapshotProducer)
[2017-08-03 11:34:27,080] DEBUG checking for more records... (io.debezium.connector.postgresql.PostgresConnectorTask)
[2017-08-03 11:34:27,080] DEBUG no records available yet, sleeping a bit... (io.debezium.connector.postgresql.PostgresConnectorTask)
[2017-08-03 11:34:27,126] DEBUG refreshing DB schema for table 'public.mytable1' (io.debezium.connector.postgresql.PostgresSchema)
[2017-08-03 11:34:27,126] DEBUG Mapping table 'public.mytable1' to schemas under 'mydb.public.mytable1' (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'id' (INT32) from column id serial(10,0) NOT NULL AUTO_INCREMENTED (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'id' (INT32) from column id serial(10,0) NOT NULL AUTO_INCREMENTED (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'name' (STRING) from column name varchar(255,0) NOT NULL (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'key' (STRING) from column key varchar(255,0) NOT NULL (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'price' (BYTES) from column price numeric(131089,0) (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'select_p' (INT32) from column select_p int4(10,0) (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,126] DEBUG - field 'myfield1' (INT32) from column myfield1 int4(10,0) NOT NULL (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,127] DEBUG - field 'created_at' (INT64) from column created_at timestamp(29,6) (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,127] DEBUG - field 'updated_at' (INT64) from column updated_at timestamp(29,6) (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,127] DEBUG Mapped primary key for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Key", "type" : "STRUCT", "optional" : "false", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}]} (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,127] DEBUG Mapped columns for table 'public.mytable1' to schema: {"name" : "mydb.public.mytable1.Value", "type" : "STRUCT", "optional" : "true", "fields" : [{"name" : "id", "index" : "0", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "name", "index" : "1", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "key", "index" : "2", "schema" : {"type" : "STRING", "optional" : "false"}}, {"name" : "price", "index" : "3", "schema" : {"name" : "org.apache.kafka.connect.data.Decimal", "type" : "BYTES", "optional" : "true", "version" : "1"}}, {"name" : "select_p", "index" : "4", "schema" : {"type" : "INT32", "optional" : "true"}}, {"name" : "myfield1", "index" : "5", "schema" : {"type" : "INT32", "optional" : "false"}}, {"name" : "created_at", "index" : "6", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}, {"name" : "updated_at", "index" : "7", "schema" : {"name" : "io.debezium.time.NanoTimestamp", "type" : "INT64", "optional" : "true", "version" : "1"}}]} (io.debezium.relational.TableSchemaBuilder)
[2017-08-03 11:34:27,132] INFO   read xlogStart at '28/9807C450' from transaction '727746' (io.debezium.connector.postgresql.RecordsSnapshotProducer)
[2017-08-03 11:34:27,132] INFO Step 3: reading and exporting the contents of each table (io.debezium.connector.postgresql.RecordsSnapshotProducer)
[2017-08-03 11:34:27,136] INFO   exporting data from table 'public.mytable1' (io.debezium.connector.postgresql.RecordsSnapshotProducer)
[2017-08-03 11:34:27,171] DEBUG sending read event 'SourceRecord{sourcePartition={server=mydb}, sourceOffset={last_snapshot_record=false, lsn=174349337680, txId=727746, ts_usec=1501760067167000, snapshot=true}} ConnectRecord{topic='mydb.public.mytable1', kafkaPartition=null, key=Struct{id=41}, value=Struct{after=Struct{id=41,name=testdata1,key=2134-e78c6c52-9fbe-4bdb-a909-2b37c0284ea7,price=199.0,select_p=1,myfield1=65,created_at=1478167039574000000,updated_at=1478167039574000000},source=Struct{name=mydb,ts_usec=1501760067167000,txId=727746,lsn=174349337680,snapshot=true,last_snapshot_record=false},op=r,ts_ms=1501760067170}, timestamp=null}' (io.debezium.connector.postgresql.RecordsSnapshotProducer)


And actual data of the numeric field is following:

# select distinct(price) from mytable;
  price
---------
   
223.0
   
556.0
   
500.0
 
1000.0
   
230.0
   
110.0
   
130.0
   
340.0
   
101.0
     
1.0
 
12345.0
   
200.0
     
8.0
 
10000.0
   
600.0
   
150.0
   
50.0
   
199.0
   
400.0
   
100.0
   
40.0
   
123.0
   
256.0
 
1112.0
 
1234.0
     
2.0
   
218.0
   
378.0
   
10.0
   
300.0
(30 rows)

I hope the above data help you to identify the problem.

Thanks,
Takahiro

Randall Hauch

unread,
Aug 4, 2017, 12:01:48 PM8/4/17
to debezium
Nice detective work! Can you please log an issue in the Debezium JIRA with all of the above details? Thanks!

To unsubscribe from this group and stop receiving emails from it, send an email to debezium+unsubscribe@googlegroups.com.

To post to this group, send email to debe...@googlegroups.com.

Takahiro Hozumi

unread,
Aug 5, 2017, 4:01:29 AM8/5/17
to debezium
Hi,

Thanks for a response.
I reported this issue to JIRA.

Thanks,
Reply all
Reply to author
Forward
0 new messages