Issue converting Geometries

450 views
Skip to first unread message

Bas van Kesteren

unread,
Mar 25, 2021, 12:20:32 PM3/25/21
to debezium
Hi,

We have a postgres Database containing postgis Datatypes, mostly multi polygons. Which are stored in a HEXEWKB geometry format in the database.

For example:
"010600002040710000010000000103000000010000000500000000000000007AF940000000004C4F184100000000607AF940000000002C4E184100000000C07DF94000000000344E184100000000807DF94000000000704F184100000000007AF940000000004C4F1841"

When te data is processed by Debezium and uploaded to the Kafka brokers the geometry will become a struct, containing WKB and a SRID, the SRID part is correct. But the WKB looks like "B@12333ee"

Does anyone know what is happening here and how to fix it?

Thanks in advance.

Bas

jiri.p...@gmail.com

unread,
Mar 26, 2021, 12:42:41 AM3/26/21
to debezium
Hi,

could you please share
* DDL of the table with the problematic column
* Debezium version
* connector configuration

Thanks

J.

Bas van Kesteren

unread,
Mar 26, 2021, 6:46:39 AM3/26/21
to debezium

Hi,

Our config is below, in the DDL I've put the problem fields in Bold...all the geometry fields are problematic...over all the tables.

Regards,

Bas


Debezium version: 1.4.2.Final (Docker image)

Config properties:

name=klic-test-debez-archief-connector

connector.class=io.debezium.connector.postgresql.PostgresConnector

tasks.max=1

database.hostname=$DATABASE_HOSTNAME

database.port=5432

database.user=klicwin_db

database.password=$DATABASE_PASSWORD

database.dbname=KLICWINARCHIEF

database.server.name=$TOPIC_PREFIX

database.whitelist=enxta110-rds03

database.history.kafka.topic=110-klic-melding

# i pitty the fool that uses spaces in between values...

table.include.list=public.klicgebiedsaanvraag,public.klicgebiedsleveringthema,public.klicgebiedslevering,public.klicbeheerdersaanvraag

plugin.name=pgoutput

slot.name=debezium_klic_archief

transforms=unwrap,dropPrefix

transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter

transforms.dropPrefix.regex=(.*).public.(.*)

transforms.dropPrefix.replacement=$1$2

output.data.format=JSON


DDL:

CREATE TABLE public.klicgebiedsaanvraag

(

    gebiedsaanvraagid character varying(50) COLLATE pg_catalog."default" NOT NULL,

    klicmeldnummer character varying(50) COLLATE pg_catalog."default",

    ordernummer character varying(50) COLLATE pg_catalog."default",

    mutatiedatum timestamp without time zone,

    omschrijvingwerkzaamheden character varying(2000) COLLATE pg_catalog."default",

    referentie character varying(100) COLLATE pg_catalog."default",

    soortwerkzaamheden character varying(200) COLLATE pg_catalog."default",

    startdatum timestamp without time zone,

    einddatum timestamp without time zone,

    verzoekcoordinatie boolean,

    verzoekmedegebruik boolean,

    aanvraagdatum timestamp without time zone,

    aanvraagsoort character varying(50) COLLATE pg_catalog."default",

    avcontactemail character varying(100) COLLATE pg_catalog."default",

    avcontactnaam character varying(200) COLLATE pg_catalog."default",

    avcontacttelefoon character varying(20) COLLATE pg_catalog."default",

    avextraemail character varying(100) COLLATE pg_catalog."default",

    avorgnaam character varying(200) COLLATE pg_catalog."default",

    avorgnaam2 character varying(200) COLLATE pg_catalog."default",

    avorgkvknummer character varying(20) COLLATE pg_catalog."default",

    avorgbezoekhuisletter character varying(20) COLLATE pg_catalog."default",

    avorgbezoekhuisnummer character varying(20) COLLATE pg_catalog."default",

    avorgbezoekhuisnrtoevoeging character varying(20) COLLATE pg_catalog."default",

    avorgbezoekbagid character varying(100) COLLATE pg_catalog."default",

    avorgbezoeklandcode character varying(50) COLLATE pg_catalog."default",

    avorgbezoekopenbareruimtenaam character varying(100) COLLATE pg_catalog."default",

    avorgbezoekpostcode character varying(20) COLLATE pg_catalog."default",

    avorgbezoekwoonplaats character varying(100) COLLATE pg_catalog."default",

    avorgpostbuslandcode character varying(50) COLLATE pg_catalog."default",

    avorgpostbusnummer character varying(20) COLLATE pg_catalog."default",

    avorgpostbuspostcode character varying(20) COLLATE pg_catalog."default",

    avorgpostbuswoonplaats character varying(100) COLLATE pg_catalog."default",

    avorgtelefoon character varying(20) COLLATE pg_catalog."default",

    avorgmobiel character varying(20) COLLATE pg_catalog."default",

    avorgfax character varying(20) COLLATE pg_catalog."default",

    avorgemail character varying(100) COLLATE pg_catalog."default",

    avorgemail2 character varying(100) COLLATE pg_catalog."default",

    avorgwebsite character varying(200) COLLATE pg_catalog."default",

    ogcontactemail character varying(100) COLLATE pg_catalog."default",

    ogcontactnaam character varying(200) COLLATE pg_catalog."default",

    ogcontacttelefoon character varying(20) COLLATE pg_catalog."default",

    ogorgnaam character varying(200) COLLATE pg_catalog."default",

    ogorgnaam2 character varying(200) COLLATE pg_catalog."default",

    ogorgkvknummer character varying(20) COLLATE pg_catalog."default",

    ogorgbezoekhuisletter character varying(20) COLLATE pg_catalog."default",

    ogorgbezoekhuisnummer character varying(20) COLLATE pg_catalog."default",

    ogorgbezoekhuisnrtoevoeging character varying(20) COLLATE pg_catalog."default",

    ogorgbezoekbagid character varying(100) COLLATE pg_catalog."default",

    ogorgbezoeklandcode character varying(50) COLLATE pg_catalog."default",

    ogorgbezoekopenbareruimtenaam character varying(100) COLLATE pg_catalog."default",

    ogorgbezoekpostcode character varying(20) COLLATE pg_catalog."default",

    ogorgbezoekwoonplaats character varying(100) COLLATE pg_catalog."default",

    ogorgpostbuslandcode character varying(50) COLLATE pg_catalog."default",

    ogorgpostbusnummer character varying(20) COLLATE pg_catalog."default",

    ogorgpostbuspostcode character varying(20) COLLATE pg_catalog."default",

    ogorgpostbuswoonplaats character varying(100) COLLATE pg_catalog."default",

    ogorgtelefoon character varying(20) COLLATE pg_catalog."default",

    ogorgmobiel character varying(20) COLLATE pg_catalog."default",

    ogorgfax character varying(20) COLLATE pg_catalog."default",

    ogorgemail character varying(100) COLLATE pg_catalog."default",

    ogorgemail2 character varying(100) COLLATE pg_catalog."default",

    ogorgwebsite character varying(200) COLLATE pg_catalog."default",

    lwopenbareruimtenaam character varying(100) COLLATE pg_catalog."default",

    lwhuisnummer character varying(20) COLLATE pg_catalog."default",

    lwhuisletter character varying(20) COLLATE pg_catalog."default",

    lwhuisnummertoevoeging character varying(20) COLLATE pg_catalog."default",

    lwwoonplaats character varying(100) COLLATE pg_catalog."default",

    lwpostcode character varying(20) COLLATE pg_catalog."default",

    lwbagid character varying(100) COLLATE pg_catalog."default",

    dbabagid character varying(100) COLLATE pg_catalog."default",

    graafpolygoon geometry(MultiPolygon,28992),

    orientatiepolygoon geometry(MultiPolygon,28992),

    informatiepolygoon geometry(MultiPolygon,28992),

    CONSTRAINT klicgebiedsaanvraag_pk PRIMARY KEY (gebiedsaanvraagid)

)

WITH (

    OIDS = FALSE

)

TABLESPACE pg_default;



Op vrijdag 26 maart 2021 om 05:42:41 UTC+1 schreef jiri.p...@gmail.com:

Gunnar Morling

unread,
Mar 26, 2021, 7:03:05 AM3/26/21
to debe...@googlegroups.com
How are you consuming/reading those values? The wkb part is exported as byte array, so it may just be a question of interpreting the value correctly. What you shared looks like a toString repesentation of the byte array potentially, the question is whether that happens in Debezium or the consumers.

--Gunnar

--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/a224a380-8b33-4897-bec6-73e63d7247fdn%40googlegroups.com.

Bas van Kesteren

unread,
Mar 26, 2021, 7:07:57 AM3/26/21
to debezium

I can view the data in our kafkaplatform and there is a struct containing WKB=B@1234567 and a SRID=28992. I was expecting a Hexvalue there, because I have no idea how to interpret the value posted by Debezium.

Bas
Op vrijdag 26 maart 2021 om 12:03:05 UTC+1 schreef Gunnar Morling:

jiri.p...@gmail.com

unread,
Mar 26, 2021, 8:07:33 AM3/26/21
to debezium
Could you please ty it with kafka-console-consmer.sh?

Bas van Kesteren

unread,
Mar 26, 2021, 10:29:41 AM3/26/21
to debezium
Done, output is Struct{wkb=[B@7cfe6bed,srid=28992}

Op vrijdag 26 maart 2021 om 13:07:33 UTC+1 schreef jiri.p...@gmail.com:

jiri.p...@gmail.com

unread,
Mar 29, 2021, 2:46:25 AM3/29/21
to debezium
Hi,

I've just tried our PostgreSQL tutorial (1.5.0.CR1 but 1.4.2.Final should be no different). Before adding the connector I executed

create table test(id serial primary key, polygon geometry(MultiPolygon,28992));
insert into test values (default, ST_GeomFromText('MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)))', 28992));

and after the plugin start I executed
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh     --bootstrap-server kafka:9092     --from-beginning     --property print.key=true     --topic dbserver1.inventory.test

and the result is
"after":{"id":1,"polygon":{"wkb":"AQYAACBAcQAAAQAAAAEDAAAAAQAAAAUAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA8D8AAAAAAAAAAAAAAAAAAPA/AAAAAAAA8D8AAAAAAAAAAAAAAAAAAPA/AAAAAAAAAAAAAAAAAAAAAA==","srid":28992}}

Could you please try it yourself and identify what is the difference from your current deployment?

But looking at the message you've posted are you sure you are using correct converter?

Thanks a lot

J.

Bas van Kesteren

unread,
Mar 29, 2021, 4:12:15 AM3/29/21
to debezium
Thanks for looking into it. But which converter should I use then?

Like I said, our geometry is stored in HEXEWKB format in the database (see topic start), not the geometry string you are using. If I'm using the converter function in PostgreSQL I get "normal" working geometries out of it, so the data is solid. But it goes wrong in the conversion.

Bas
Op maandag 29 maart 2021 om 08:46:25 UTC+2 schreef jiri.p...@gmail.com:

jiri.p...@gmail.com

unread,
Mar 29, 2021, 5:34:38 AM3/29/21
to debezium
Hi,

you should use org.apache.kafka.connect.json.JsonConverter - it is configured either per connector or globaly in Kafka Connect. It seems to me that you have org.apache.kafka.connect.storage.StringConverter in place.

Please check key.converter and value.converter config in connect-distributed.properties

J.

Bas van Kesteren

unread,
Mar 29, 2021, 8:27:21 AM3/29/21
to debezium
Yes. You are a hero :). This works. Thanks a million!

Op maandag 29 maart 2021 om 11:34:38 UTC+2 schreef jiri.p...@gmail.com:

Kamal Chavda

unread,
Oct 6, 2021, 1:18:03 PM10/6/21
to debezium
I've run into this same issue. I checked my connect-distributed.properties and key.converter and value.converter are set correctly to org.apache.kafka.connect.json.JsonConverter. 

The column definition in postgres table = [columnname] geography(point, 4326) NOT NULL
example value stored in table looks like: POINT (-84.316056 33.881333)

example kafka message value: "point" : { "wkb" : "AQEAACDmEAAA5no2BviTXcB1T2ijhAxBQA==", "srid" : 4326 }

Gunnar Morling

unread,
Oct 6, 2021, 4:46:26 PM10/6/21
to debezium
Kamal,

It's not quite clear what you're asking. The message representation you shared is JSON. the wkb value is the base64 encoded binary value. Can you elaborate on the problem you encounter?

--Gunnar
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted

Kamal Chavda

unread,
Oct 11, 2021, 11:24:58 AM10/11/21
to debezium
For some reason my reply messages keep getting deleted. 
I am working on a POC using Debezium to write from Aurora Postgres to Apache Pinot. When writing to Pinot I got the error below:

java.lang.IllegalStateException: Cannot read single-value from Collection: [AQEAACDmEAAA5no2BviTXcB1T2ijhAxBQA==, 4326] for column: point at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:721) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.recordtransformer.DataTypeTransformer.standardizeCollection(DataTypeTransformer.java:193) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.recordtransformer.DataTypeTransformer.standardize(DataTypeTransformer.java:138) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:88) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.recordtransformer.CompositeTransformer.transform(CompositeTransformer.java:82) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:491) [pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:402) [pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:538) [pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-afa4b252ab1c424ddd6c859bb305b2aa342b66ed] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

So I looked at the topic to see the actual message and noticed the formatting that I shared in my previous message. 
I am not sure if it's an issue with Pinot schema. I've reached out to them on their slack channel. I also noticed this post here and figured I inquire about it in case it's a config issue on the connector.

Below are the properties on the connector:
{

  "name": "perf-connector2",  
  "config": {  
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",  
  "tasks.max": "1",  
  "database.hostname": "",  
  "database.port": "5432",  
  "database.user": "",  
  "database.password": "",  
  "database.dbname" : "",  
  "database.server.name": "",  
  "database.sslmode":"require",
  "schema.include": "public",
  "table.include.list":"",
  "plugin.name":"pgoutput",
  "slot.name":"debezium",
  "publication.name":"dbz_publication",
  "column.exclude.list":"",
  "heartbeat.interval.ms":"30000",
  "publication.autocreate.mode":"filtered",
  "time.precision.mode":"connect",
  "decimal.handling.mode":"string",
  "hstore.handling.mode":"json",
  "key.converter":"org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable":"false",    
  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable":"false",  
  "transforms": "route,unwrap",
  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
  "transforms.route.replacement": "$3",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }  
}

Gunnar Morling

unread,
Oct 12, 2021, 3:16:50 AM10/12/21
to debezium
Hi,

After talking to Kenny Bastani from the Apache Pinot team, I can confirm that POINT isn't a supported type in Pinot. I'm not sure how exactly you handle the inserts on the Pinot side (using some Kafka Connect connector?), but you'd have to convert these column values into a format first which is understood by Pinot. In Kafka Connect, an SMT would be a good solution to that job.

--Gunnar

Kamal Chavda

unread,
Oct 12, 2021, 10:08:45 AM10/12/21
to debezium
Thank you for looking into this Gunnar. Much appreciated!
The inserts from Pinot are done using the pinot stream plugin directly from Kafka topic (https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/import-from-apache-kafka). There are transformations supported by Pinot but I have not had any luck. According to documentation I should be able to use geospatial functions that convert wkb to Geography (https://docs.pinot.apache.org/basics/indexing/geospatial-support#geometry-vs-geography).

Kamal Chavda

unread,
Oct 12, 2021, 10:21:22 AM10/12/21
to debezium
*inserts into Pinot
Message has been deleted
Message has been deleted

Gunnar Morling

unread,
Oct 12, 2021, 4:52:18 PM10/12/21
to debe...@googlegroups.com
Hum, something weird is happening; Kenny's reply got stuck in the approval queue, and while I'm 100% sure I "approved" it, it now shows up as "deleted" here. I got it in my own inbox though, hence I'm forwarding it to the list again (see below). Sorry for the hassle, something seems quite off with Google Groups lately.

Thanks for chiming in, Kenny!

--Gunnar

Am Di., 12. Okt. 2021 um 22:34 Uhr schrieb 'Kenny Bastani' via debezium <debe...@googlegroups.com>:
Sorry for my delayed reply here. Also, thanks Gunnar for sending along that information.

Regarding the original issue:

To ingest the point value from a WKB formatted binary representation, there is a transformation function available in Apache Pinot that you can add to your schema configuration that creates a generated column as a Geography instance, which allows you to perform geospatial operations using H3 indexing (https://h3geo.org/).

The data value format that was provided by the OP (B@12333ee) is a hexadecimal hash string representation of the integer value represented by the byte array of a WKB geometry value, which is often used as a placeholder in database query results. It typically makes it easier and more performant to do equality comparisons between byte values. I think you were able to sort that issue out, which is great.

@kamal

The one thing I would point out here is that EWKB is not supported as a Geography instance using ingestion transformation functions available in Pinot. You'll need to encode the exported geospatial point as a hexadecimal string or byte array without an SRID. Once you've done that, the rest is pretty simple using a generated column in a Pinot schema.

Here is an example:


From the example link:

      "transformFunction": "toSphericalGeography(stPoint(driverLon,driverLat))"

This function here is using stPoint to convert two separate lat and lon columns into a Geometry point data type, which is required to convert the field into a Geography instance used for queries. The transformation function you'll use instead is ST_GeomFromWKB(bytes wkb) → Geometry. Which will end up looking like this:

      "transformFunction": "toSphericalGeography(ST_GeomFromWKB(point))"

In this snippet, point is the name of the column that is in the payload of your Kafka message. Now you will be able to ingest the values into a real-time table and use geospatial functions in your Pinot SQL queries. See the geospatial documentation for more info, and I'll be happy to help if you need anything else.

Thanks,

Kenny Bastani
Message has been deleted

Kamal Chavda

unread,
Oct 12, 2021, 5:56:34 PM10/12/21
to debezium
Thank you Gunnar for reposting. Google Groups is definitely acting strange.

Hi Kenny, 
Thank you for your reply and the detailed explanation. 

I've actually been using the order-delivery-microservice-example as an example for my POC, mainly the debezium/Pinot set-up and table/schema definitions. I have lon/lat columns in my schema along with POINT and LINESTRING. I ended up using the lon/lat columns and created a new POINT column using "transformFunction": "toSphericalGeography(stPoint(Lon,Lat))"
I will look into Kafka Connect SMT as Gunnar suggested and split out the wkb value from the struct since I still have a LINESTRING column to load. However, since Debezium generates the value as base64 encoded binary, would I have to decode and encode again? Any suggestions? 

Kenny Bastani

unread,
Oct 15, 2021, 11:43:51 AM10/15/21
to debe...@googlegroups.com
You’re very welcome.

You should have no problem ingesting the encoded bytes into a real-time table using the Pinot byte data type specified in your schema. 

We have multiple different transforms that accepts bytes for WKB. Use the appropriate transform function from the list here: 

If you need some more help figuring things out, please head over to our community Slack channel and we can get on a call or figure it out over DM.


Carl Herbst

unread,
Apr 18, 2024, 1:49:37 PM (8 days ago) Apr 18
to debezium
Hi,

I have a similar problem to what Bas wrote about in 2021 but I am not using postgres, rather mysql.

I have polygon geometry data sent via a debezium mysql source connector to a Kafka topic where I need it to be sunk to another mysql database with CDC being maintained by the debezium mysql source connector.
If I use either of the AVRO or JSON registries for value conversion, the polygon data is split into a wkb and srid component.
The JDBC sink connector (confluent) is unable to process this as it represents a STRUCT. Using the Flatten$Value and ReplaceField$Value SMTs it still generates errors with the final value being truncated for the destination column of type mysql polygon.I suspect the wkb value first needs to be converted to some PLYGON text field or another type.

When using the confluent JDBC SOURCE connector, the data type is correctly stored in the topic as bytes and passes correctly back to the destination sink table using JDBC sink connector.
I have a requirement to use the debezium source mysql connector due to it's superior CDC abilities with doing a bulk first source table load then streaming automatically.

How can I transform/translate/recombine the resultant topic data of srid+wkb (from the debezium connector) back to this format for mysql?

POLYGON ((-34.81928470987648 20.05889332311438, -34.81910568154112 20.05890841408916, -34.81908079019686 20.05847424198321, -34.81925981836727 20.05845915008302, -34.81928470987648 20.05889332311438))

Chris Cranford

unread,
Apr 18, 2024, 2:01:35 PM (8 days ago) Apr 18
to debe...@googlegroups.com
Hi Carl -

This can easy be accomplished by using the Debezium JDBC sink connector rather than the Confluent JDBC sink.  The Debezium connector is capable of understanding these complex Debezium source connector field types and can automatically serialize the data back into a geometry column at the target.

Hope that helps.
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.

Carl Herbst

unread,
Apr 22, 2024, 1:58:29 PM (4 days ago) Apr 22
to debezium
Thanks Chris, 
I managed to get the data to the destination table with the JDBC sink connector to a MEDIUMBLOB field and by using an insert/update trigger and the mysql spatial functions on the mysql destination table, I can insert/update the polygon data in its correct field. 
I am unable to load the debezium sink connector (debezium-connector-jdbc-2.6.0.Final.jar) on the Confluent platform (cp-kafka-connect-base v7.5.3 standalone), some issues with the java classes not being found despite all the *.jar files being in the correct place for this sink connector.
Not sure if anybody else has the same issue?

Chris Cranford

unread,
Apr 22, 2024, 2:11:10 PM (4 days ago) Apr 22
to debe...@googlegroups.com
Hi Carl -

Can you share the errors you're facing?

Chris

Carl Herbst

unread,
Apr 25, 2024, 12:46:52 PM (2 days ago) Apr 25
to debezium
hi Chris

Looks like it is the the scala module looking for a specific jackson databind  version. Incidentally, the sink connector properties are pasted below the error. I am trying to invoke the mariadb driver due to the issue it's reporting here at the top, but does not seem to register either. The source and sink databases are both mariadb databases. The source debezium connector works fine and has been for a long time.

[2024-04-23 06:52:26,612] WARN [sink-scout-polygons|task-7] HHH000511: The 5.5.0 version for [org.hibernate.dialect.MySQLDialect] is no longer supported, hence certain features may not work properly. The minimum supported version is 5.7.0. Check the community dialects project for available legacy versions. (org.hibernate.dialect.Dialect:351)
[2024-04-23 06:52:27,337] ERROR [sink-scout-polygons|task-5] WorkerSinkTask{id=sink-scout-polygons-5} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
java.lang.ExceptionInInitializerError
at org.hibernate.boot.internal.SessionFactoryOptionsBuilder.lambda$determineJsonFormatMapper$6(SessionFactoryOptionsBuilder.java:780)
at org.hibernate.boot.registry.selector.internal.StrategySelectorImpl.resolveStrategy(StrategySelectorImpl.java:220)
at org.hibernate.boot.registry.selector.internal.StrategySelectorImpl.resolveDefaultableStrategy(StrategySelectorImpl.java:180)
at org.hibernate.boot.internal.SessionFactoryOptionsBuilder.determineJsonFormatMapper(SessionFactoryOptionsBuilder.java:776)
at org.hibernate.boot.internal.SessionFactoryOptionsBuilder.<init>(SessionFactoryOptionsBuilder.java:297)
at org.hibernate.boot.internal.SessionFactoryBuilderImpl.<init>(SessionFactoryBuilderImpl.java:49)
at org.hibernate.boot.internal.DefaultSessionFactoryBuilderService.createSessionFactoryBuilder(DefaultSessionFactoryBuilderService.java:26)
at org.hibernate.boot.internal.MetadataImpl.getSessionFactoryBuilder(MetadataImpl.java:170)
at org.hibernate.cfg.Configuration.buildSessionFactory(Configuration.java:911)
at org.hibernate.cfg.Configuration.buildSessionFactory(Configuration.java:960)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.start(JdbcSinkConnectorTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.13.5 requires Jackson Databind version >= 2.13.0 and < 2.14.0 - Found jackson-databind version 2.14.2
at com.fasterxml.jackson.module.scala.JacksonModule.setupModule(JacksonModule.scala:61)
at com.fasterxml.jackson.module.scala.JacksonModule.setupModule$(JacksonModule.scala:46)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:17)
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:853)
at com.fasterxml.jackson.databind.ObjectMapper.registerModules(ObjectMapper.java:1055)
at com.fasterxml.jackson.databind.ObjectMapper.findAndRegisterModules(ObjectMapper.java:1139)
at org.hibernate.type.format.jackson.JacksonXmlFormatMapper.createXmlMapper(JacksonXmlFormatMapper.java:48)
at org.hibernate.type.format.jackson.JacksonXmlFormatMapper.<init>(JacksonXmlFormatMapper.java:38)
at org.hibernate.type.format.jackson.JacksonIntegration.<clinit>(JacksonIntegration.java:17)
... 20 more
[2024-04-23 06:52:27,339] INFO [sink-scout-polygons|task-5] [Consumer clientId=connector-consumer-sink-scout-polygons-5, groupId=connect-sink-scout-polygons] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1025)

Sink connector properties:
{
    "name": "sink-db-polygons",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "8",
        "topics": "<db>.polygons",
        "batch.size": "25000",
        "consumer.override.max.poll.records": "25000",
        "consumer.override.max.partition.fetch.bytes": "8388608",
        "consumer.override.fetch.max.bytes": "8388608",
        "max.retries": "100",
        "retry.backoff.ms": "30000",
        "errors.tolerance": "all",
        "errors.retry.timeout": "10",
        "errors.log.enable": true,
        "errors.log.include.messages": true,
        "connection.url": "jdbc:mysql://<host>:3306/<db>",
        "connection.username": "<username>",
        "connection.password": "<pwd>",
        "connection.provider": "org.hibernate.c3p0.internal.C3P0ConnectionProvider",
        "connector.adapter": "mariadb",
        "database.protocol": "jdbc:mariadb",
        "database.jdbc.driver": "org.mariadb.jdbc.Driver",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "insert.mode": "upsert",
        "auto.evolve": "true",
        "delete.enabled": "true",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$2",
        "primary.key.fields": "id",
        "transforms": "route",
        "primary.key.mode": "record_key"
   }
}
Reply all
Reply to author
Forward
0 new messages