Incompatible schema between results and sink error when insert into topic with predefined topics

532 views
Skip to first unread message

Jinli Liang

unread,
Oct 5, 2020, 10:15:25 PM10/5/20
to ksqldb-users
Hi ksqldb users,

I've run into some issues with errors `Incompatible schema between results and sink.` when trying to insert into stream with topic that have schema pre-defined in schema registry.

KSQLDB version: 0.12.0

Steps to reproduce:

- Create Topic `foobar_0` with Schema
```
{
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    },
    {
      "name": "col3",
      "type": "string"
    }
  ],
  "name": "value_foobar_0",
  "namespace": "foobar",
  "type": "record"
}
```

- Create Stream `stream_foobar_0`
```
CREATE OR REPLACE STREAM stream_foobar_0 (
    col1 string,
    col2 string,
    col3 string
) WITH (
    KAFKA_TOPIC='foobar_0',
    VALUE_FORMAT='AVRO'
);
```

- Create Topic `foobar_1` with Schema
```
{
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    }
  ],
  "name": "value_foobar_s1",
  "namespace": "foobar",
  "type": "record"
}
```

- Create Stream `stream_foobar_1`
```
CREATE OR REPLACE STREAM stream_foobar_1 (
    col1 string,
    col2 string
) WITH (
    KAFKA_TOPIC='foobar_1',
    VALUE_FORMAT='AVRO'
);
```

- Create Topic `foobar_2` with Schema
```
{
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    }
  ],
  "name": "value_foobar_2",
  "namespace": "foobar",
  "type": "record"
}
```

- Create Stream
```
CREATE OR REPLACE STREAM stream_foobar_2 (
    col1 string,
    col3 string
) WITH (
    KAFKA_TOPIC='foobar_2',
    VALUE_FORMAT='AVRO'
);
```



- The following query returns incompatible schema error
```
INSERT INTO stream_foobar_0
SELECT
    x.col1,
    x.col2,
    y.col3
FROM
    stream_foobar_1 AS x
INNER JOIN
    stream_foobar_2 AS y
WITHIN 1 HOUR ON
    x.col1 = y.col1
EMIT CHANGES;
```
Returns error
```
Incompatible schema between results and sink.
Result schema is
`X_COL1` STRING KEY,
`COL2` STRING,
`COL3` STRING

Sink schema is
`COL1` STRING,
`COL2` STRING,
`COL3` STRING
```


- Try again with intermediate stream
```
CREATE OR REPLACE STREAM foo_test_0  WITH (VALUE_FORMAT='AVRO') AS
SELECT
    x.col1 AS col1,
    x.col2 AS col2,
    y.col3 AS col3
FROM
    stream_foobar_1 AS x
INNER JOIN
    stream_foobar_2 AS y
WITHIN 1 HOUR ON
    x.col1 = y.col1
EMIT CHANGES;


INSERT INTO stream_foobar_0
SELECT
    x.col1,
    x.col2,
    x.col3
FROM foo_test_0 AS x
EMIT CHANGES;
```
Returns Error
```
Incompatible schema between results and sink.
Result schema is
`COL1` STRING KEY,
`COL2` STRING,
`COL3` STRING

Sink schema is
`COL1` STRING,
`COL2` STRING,
`COL3` STRING
```

Also note the schema generated automatically for stream `foo_test_0` does not have the key column
```
{
  "connect.name": "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema",
  "fields": [
    {
      "default": null,
      "name": "COL2",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "COL3",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "type": "record"
}
```

It appears it have something to do with https://docs.ksqldb.io/en/latest/concepts/schemas/#schema-inference , but i am not 100% sure about it.

Main use case for having the schema predefined in schema registry is that we could generate native classes files statically and ship with application;
So far most of the tutorials online seem to create the topic / streams on the file with auto-generated schemes;

Thanks!

Jinli Liang

unread,
Oct 5, 2020, 11:38:22 PM10/5/20
to ksqldb-users
With some reading from https://github.com/confluentinc/ksql/blob/master/CHANGELOG.md#key-columns-required-in-projection

using  `as_value` gives a different error
```
INSERT INTO stream_foobar_0
SELECT
   as_value(x.col1) as col1,
    x.col2,
    x.col3
FROM foo_test_0 AS x
EMIT CHANGES;
```

Error produced
`Key missing from projection. The query used to build `STREAM_FOOBAR_0` must include the key column COL1 in its projection.`

Jinli Liang

unread,
Oct 5, 2020, 11:59:49 PM10/5/20
to ksqldb-users
with some extra reading on partial schema inference (https://www.confluent.io/blog/ksqldb-0-10-updates-key-columns/)

This seem to work

- add extra col with key col
```
CREATE OR REPLACE STREAM stream_foobar_00 (distkey string key) WITH (
    KAFKA_TOPIC='foobar_0',
    VALUE_FORMAT='AVRO'
);
```

- partition by key
```
INSERT INTO stream_foobar_00
SELECT
    x.col1 as distkey,
    as_value(x.col1) as col1,
    x.col2,
    x.col3
FROM foo_test_0 AS x
EMIT CHANGES;
```



Jinli Liang

unread,
Oct 6, 2020, 12:57:57 AM10/6/20
to ksqldb-users
^ above appears to work, however as soon as data are emitted from the push query, the schema of the topic will be overwritten

original schema of topic `foobar_0`
```
{
  "fields": [
    {
      "name": "col1",
      "type": "string"
    },
    {
      "name": "col2",
      "type": "string"
    },
    {
      "name": "col3",
      "type": "string"
    }
  ],
  "name": "value_foobar_0",
  "namespace": "foobar",
  "type": "record"
}
```

as soon as there is data generated by (INSERT xxx), the schema changes to 
```
{
  "fields": [
    {
      "default": null,
      "name": "col1",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "col2",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "col3",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "type": "record"
}
```

this does not really follow the default backward compatibility from schema registry..
Is this behaviour expected?

Jinli Liang

unread,
Oct 12, 2020, 6:53:58 PM10/12/20
to ksqldb-users
These two issues seems related. 
Is there some sort of workaround? 
One I could think of would be create the topics / schemas via ksql and roll our own consumer / producer to write to topics with "correct" schemas
Reply all
Reply to author
Forward
0 new messages