bytes instead of int

351 views
Skip to first unread message

Florian Huc

unread,
Apr 21, 2017, 5:34:35 AM4/21/17
to Confluent Platform
Hi,
I am exploring Kafka and facing an issue with a consumer returning me bytes where I would expect int.

Here is what I do:

I have a table: create table mytest3( my_id NUMBER(5) not null, MY_TIMESTAMP TIMESTAMP not null, my_other_id NUMBER not null, my_double BINARY_DOUBLE);

I start the various services:
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties&
./bin/kafka-server-start ./etc/kafka/server.properties&
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties&

Then I start a connector: ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/source-oracle-sql.properties

This in turn creates a schema automaticaly:

curl -X GET -i -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/subjects/test-sql-jdbc-MYTEST3-value/latest

HTTP/1.1 200 OK

Date: Fri, 21 Apr 2017 09:08:42 GMT

Content-Type: application/vnd.schemaregistry.v1+json

Content-Length: 799

Server: Jetty(9.2.12.v20150709)

 

{"subject":"test-sql-jdbc-MYTEST3-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"MYTEST3\",\"fields\":[{\"name\":\"MY_ID\",\"type\":{\"type\":\"bytes\",\"scale\":0,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"0\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}},{\"name\":\"MY_TIMESTAMP\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"org.apache.kafka.connect.data.Timestamp\",\"logicalType\":\"timestamp-millis\"}},{\"name\":\"MY_OTHER_ID\",\"type\":{\"type\":\"bytes\",\"scale\":0,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"0\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}}],\"connect.name\":\"MYTEST3\"}"}


but when I read with a consumer, the bytes are not transformed into int: {"MY_ID":"\u0001","MY_TIMESTAMP":1491350400000,"MY_OTHER_ID":"v.Ôg!Ñò��ÅZg_4÷)�� °úoÏÑ\u000BeÍ\u001Eo·rÍ\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000"}



I changed the schemas:

curl -X GET -i -H "Content-Type: application/vnd.schemaregistry.v1+json" http://localhost:8081/subjects/test-sql-jdbc-MYTEST3-value/versions/latest


return:

HTTP/1.1 200 OK

Date: Fri, 21 Apr 2017 09:04:52 GMT

Content-Type: application/vnd.schemaregistry.v1+json

Content-Length: 255

Server: Jetty(9.2.12.v20150709) 


{"subject":"test-sql-jdbc-MYTEST3-value","version":2,"id":3,"schema":"{\"type\":\"record\",\"name\":\"MYTEST3\",\"fields\":[{\"name\":\"MY_ID\",\"type\":\"int\"},{\"name\":\"MY_TIMESTAMP\",\"type\":\"long\"},{\"name\":\"MY_OTHER_ID\",\"type\":\"int\"}]}"}


 

but I still get the same result from the consumer, even after I restarted everything.

So I have two questions:

1- why my latest schema is not used ?

2- how can I get the connector create a schemas from which the consummer will give me int directly ?


Thanks in advance for your help!

Florian





Ewen Cheslack-Postava

unread,
Apr 21, 2017, 3:03:44 PM4/21/17
to Confluent Platform
Registering a new schema in the schema registry won't impact how the connector behaves -- it will always use a schema as determined by the input source.

The problem you're having is that Oracle is exposing that column as a Decimal in order to support very high precision. The connector converts that to the logical type for Decimal, which is a type that has an encoding in one of the basic types provided by Connect but can be tagged with extra information to indicate how it should be decoded. For Decimal, the underlying type is bytes, but you'll notice in the schema that there's some extra information:

{\"type\":\"bytes\",\"scale\":0,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"0\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}},

This allows downstream consumers to decode it properly. There are a couple of issues with this currently. Some Avro logical types are not being translated correctly by the Converter: https://github.com/confluentinc/schema-registry/issues/522. With that issue fixed I think your consumer would at least see the proper Decimal decoding (although that may not be what you are expecting, it can easily be converted to an int/long).

The other issue is on the source connector side: https://github.com/confluentinc/kafka-connect-jdbc/issues/101. The idea is to allow overriding the type of columns in cases when the database exposes them as an unexpected type. This may actually be resolved by single message transformations, there is a WIP Cast transformation that might help here: https://github.com/apache/kafka/pull/2458

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/a941b9e3-6f26-4eb6-adab-156a17863984%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages