I've run into some issues with errors `Incompatible schema between results and sink.` when trying to insert into stream with topic that have schema pre-defined in schema registry.
- Create Topic `foobar_0` with Schema
```
{
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"name": "col3",
"type": "string"
}
],
"name": "value_foobar_0",
"namespace": "foobar",
"type": "record"
}
```
- Create Stream `stream_foobar_0`
```
CREATE OR REPLACE STREAM stream_foobar_0 (
col1 string,
col2 string,
col3 string
) WITH (
KAFKA_TOPIC='foobar_0',
VALUE_FORMAT='AVRO'
);
```
- Create Topic `foobar_1` with Schema
```
{
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
}
],
"name": "value_foobar_s1",
"namespace": "foobar",
"type": "record"
}
```
- Create Stream `stream_foobar_1`
```
CREATE OR REPLACE STREAM stream_foobar_1 (
col1 string,
col2 string
) WITH (
KAFKA_TOPIC='foobar_1',
VALUE_FORMAT='AVRO'
);
```
- Create Topic `foobar_2` with Schema
```
{
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
}
],
"name": "value_foobar_2",
"namespace": "foobar",
"type": "record"
}
```
- Create Stream
```
CREATE OR REPLACE STREAM stream_foobar_2 (
col1 string,
col3 string
) WITH (
KAFKA_TOPIC='foobar_2',
VALUE_FORMAT='AVRO'
);
```
- The following query returns incompatible schema error
```
INSERT INTO stream_foobar_0
SELECT
x.col1,
x.col2,
y.col3
FROM
stream_foobar_1 AS x
INNER JOIN
stream_foobar_2 AS y
WITHIN 1 HOUR ON
x.col1 = y.col1
EMIT CHANGES;
```
Returns error
```
Incompatible schema between results and sink.
Result schema is
`X_COL1` STRING KEY,
`COL2` STRING,
`COL3` STRING
Sink schema is
`COL1` STRING,
`COL2` STRING,
`COL3` STRING
```
- Try again with intermediate stream
```
CREATE OR REPLACE STREAM foo_test_0 WITH (VALUE_FORMAT='AVRO') AS
SELECT
x.col1 AS col1,
x.col2 AS col2,
y.col3 AS col3
FROM
stream_foobar_1 AS x
INNER JOIN
stream_foobar_2 AS y
WITHIN 1 HOUR ON
x.col1 = y.col1
EMIT CHANGES;
INSERT INTO stream_foobar_0
SELECT
x.col1,
x.col2,
x.col3
FROM foo_test_0 AS x
EMIT CHANGES;
```
Returns Error
```
Incompatible schema between results and sink.
Result schema is
`COL1` STRING KEY,
`COL2` STRING,
`COL3` STRING
Sink schema is
`COL1` STRING,
`COL2` STRING,
`COL3` STRING
```
Also note the schema generated automatically for stream `foo_test_0` does not have the key column
{
"
connect.name": "io.confluent.ksql.avro_schemas.KsqlDataSourceSchema",
"fields": [
{
"default": null,
"name": "COL2",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "COL3",
"type": [
"null",
"string"
]
}
],
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"type": "record"
}
Main use case for having the schema predefined in schema registry is that we could generate native classes files statically and ship with application;
So far most of the tutorials online seem to create the topic / streams on the file with auto-generated schemes;