Wrong result when joining stream and table

277 views
Skip to first unread message

Hieu Lam Tri

unread,
Mar 16, 2021, 11:17:27 PM3/16/21
to ksqldb-users
Hi, 
I have a stream like this 
Stream Transaction (key format Avro, value format Avro)
TransactionId, date, UserId
and table Users (key format Avro, value format Avro)
Id, user_uuid

I join stream Transaction and Users with 

Create Stream Transaction_Join as 
  select t.transactionId, t.date,u.id,u.user_uuid
  from Transaction t
  left join Users u on t.userId = u.id
  emit changes;

The number of records in transaction and the resulting stream is the same. That's is expected. 
But the problem is that some of the record in Transaction_join has user_uuid of null value. I check the source stream and table and they all have valid values. 
Is that the bug or anything that I am missing ? 
Can I use the key format of avro or do I need to change it to default kafka for the join to work ? 

Thanks 

Matthias J. Sax

unread,
Mar 24, 2021, 1:14:38 PM3/24/21
to ksql-...@googlegroups.com
Records are processed in timestamp order. Thus, if a "table side" record
has a larger timestamp than a stream side record the stream-side record
is processed first.

If you want to ensure that the table is "load" first, you need to ensure
that the table-side records have smaller timestamps than the stream-side
records.

-Matthias

On 3/16/21 8:17 PM, Hieu Lam Tri wrote:
> Hi, 
> I have a stream like this 
> Stream *Transaction* (key format Avro, value format Avro)
> TransactionId, date, UserId
> and table *Users* (key format Avro, value format Avro)
> Id, user_uuid
>
> I join stream Transaction and Users with 
>
> Create Stream Transaction_Join as 
>   select t.transactionId, t.date,u.id,u.user_uuid
>   from Transaction t
>   left join Users u on t.userId = u.id
>   emit changes;
>
> The number of records in transaction and the resulting stream is the
> same. That's is expected. 
> But the problem is that some of the record in Transaction_join has
> user_uuid of null value. I check the source stream and table and they
> all have valid values. 
> Is that the bug or anything that I am missing ? 
> Can I use the key format of avro or do I need to change it to default
> kafka for the join to work ? 
>
> Thanks 
>
> --
> You received this message because you are subscribed to the Google
> Groups "ksqldb-users" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to ksql-users+...@googlegroups.com
> <mailto:ksql-users+...@googlegroups.com>.
> To view this discussion on the web, visit
> https://groups.google.com/d/msgid/ksql-users/6005a124-969e-44cf-a9bf-7759475e3ea6n%40googlegroups.com
> <https://groups.google.com/d/msgid/ksql-users/6005a124-969e-44cf-a9bf-7759475e3ea6n%40googlegroups.com?utm_medium=email&utm_source=footer>.

Hieu Lam Tri

unread,
Apr 24, 2021, 12:29:14 AM4/24/21
to ksqldb-users
Hi Matthias,

I am still not clear on this one. I have a stream to join with a table. 
I can make sure that I create all the required table first. So I think the timestamp of the table side would be earlier than the stream side. 
But when I do the join, there's still some stream record with join value is null. 

Any idea with this ? 

Regards,
Hieu Lam 




Hieu Lam Tri

unread,
Apr 24, 2021, 1:12:57 AM4/24/21
to ksqldb-users
Hi Matthias,

Let me try to eleborate more on what I am currently doing so that you can have a better understading. 
I user debezium connector and stream data from source Posgresql db to kafka. 
So I have the following topic: 
transaction(id, date, user_id, ....)
user(id, name, address)

I create the stream, table and do the join as follow:
create stream user -> stream user created successfully
create table user as 
  select * from user group by id emit changes; -> table created successfully ( I am sure I am waiting for all the records is created and check the timestamp)

create stream transaction; -> stream create successfully (I can make sure that all the timestamp of this stream record is later than the table)
create stream join as
  select xx, yy, zz, t.name as user_name
  from transaction t
  left join user u on t.user_id = u.id

But when I run the query 
select * from join where user_name is null emit changes;

There's lot of records that have null user_name. 

I suppose that stream/table join would work fine for this very simple use case.
Any idea ? 

Regards,
Hieu Lam 

Matthias J. Sax

unread,
Apr 25, 2021, 4:23:59 PM4/25/21
to ksql-...@googlegroups.com
Well, it does not matter when you issue the CREATE STREAM or CREATE
TABLE statement.

Each record has a timestamp that is stored in the Kafka topic and those
timestamps are set _on write_, ie, when the data is appended to the
topic. By default those record timestamp are used during processing.

I am not familiar with the details of Debezium, but if Debezium sets
those timestamps on write for your table data, than that is the
timestamps that will be used.

Check out the following two talks that should help:
-
https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
-
https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/


-Matthias


On 4/23/21 10:12 PM, Hieu Lam Tri wrote:
> Hi Matthias,
>
> Let me try to eleborate more on what I am currently doing so that you
> can have a better understading. 
> I user debezium connector and stream data from source Posgresql db to
> kafka. 
> So I have the following topic: 
> *transaction*(id, date, user_id, ....)
> *user(id, name, address)*
> >   select t.transactionId, t.date,u.id <http://u.id>,u.user_uuid
> >   from Transaction t
> >   left join Users u on t.userId = u.id <http://u.id>
> <https://groups.google.com/d/msgid/ksql-users/6005a124-969e-44cf-a9bf-7759475e3ea6n%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/ksql-users/6005a124-969e-44cf-a9bf-7759475e3ea6n%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "ksqldb-users" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to ksql-users+...@googlegroups.com
> <mailto:ksql-users+...@googlegroups.com>.
> To view this discussion on the web, visit
> https://groups.google.com/d/msgid/ksql-users/84f9680a-040a-4ae3-b113-e6483b06c376n%40googlegroups.com
> <https://groups.google.com/d/msgid/ksql-users/84f9680a-040a-4ae3-b113-e6483b06c376n%40googlegroups.com?utm_medium=email&utm_source=footer>.

Hieu Lam Tri

unread,
Apr 25, 2021, 10:44:25 PM4/25/21
to ksqldb-users
Hi Matthias,
Thanks for getting back to me. 
I think we have the bug logged already https://github.com/confluentinc/ksql/issues/3997
I am pretty sure I do the same and I can make sure my record in the stream have bigger timestamp than in the table side. 
However, the join is not successful. 
It's somehow disappointed for this very simple use case and I would need to change to use Kafka stream to see if it can works properly. 
Let me know if you also think that could be a bug in ksqldb.

Regards,
Hieu Lam

Matthias J. Sax

unread,
Apr 26, 2021, 12:39:09 PM4/26/21
to ksql-...@googlegroups.com
The ticket you linked is quite old, and lot of improvement with regard
to timestamp synchronization are included in newer versions.

If there is still some issue, try to increase configuration parameter
`max.task.idle.ms`, as describe the the talks I linked previously.


-Matthias

On 4/25/21 7:44 PM, Hieu Lam Tri wrote:
> Hi Matthias,
> >   select xx, yy, zz, t.name <http://t.name> as user_name
> >   from transaction t
> >   left join user u on t.user_id = u.id <http://u.id>
> > >   select t.transactionId, t.date,u.id <http://u.id> <http://u.id
> <http://u.id>>,u.user_uuid
> > >   from Transaction t
> > >   left join Users u on t.userId = u.id <http://u.id>
> <http://u.id <http://u.id>>
> <https://groups.google.com/d/msgid/ksql-users/84f9680a-040a-4ae3-b113-e6483b06c376n%40googlegroups.com?utm_medium=email&utm_source=footer
> <https://groups.google.com/d/msgid/ksql-users/84f9680a-040a-4ae3-b113-e6483b06c376n%40googlegroups.com?utm_medium=email&utm_source=footer>>.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "ksqldb-users" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to ksql-users+...@googlegroups.com
> <mailto:ksql-users+...@googlegroups.com>.
> To view this discussion on the web, visit
> https://groups.google.com/d/msgid/ksql-users/c06d2d99-922f-4300-ad34-d626eb0e337cn%40googlegroups.com
> <https://groups.google.com/d/msgid/ksql-users/c06d2d99-922f-4300-ad34-d626eb0e337cn%40googlegroups.com?utm_medium=email&utm_source=footer>.

Ren Yang

unread,
Apr 26, 2021, 11:47:15 PM4/26/21
to Hieu Lam Tri, ksqldb-users
Hi Matthias,
For your below comment "...you need to ensure that the table-side records have smaller timestamps than the stream-side records." Can I manually increase the stream's timestamp so that it could be larger than the table side? For example, adding 5000 milliseconds on created_dt. I am using the users stream LEFT JOIN orders table. In my business system, users topic usually has smaller timestamp than orders topic.

For example:
CREATE STREAM s_users
WITH (
  KAFKA_TOPIC = 's_users',
  VALUE_FORMAT = 'avro',
  TIMESTAMP = 'created_dt'
)
AS SELECT id, first_name, last_name, gender, *created_dt + 5000* AS
created_dt
FROM users
PARTITION BY id

---------------

Records are processed in timestamp order. Thus, if a "table side" record
has a larger timestamp than a stream side record the stream-side record
is processed first.

If you want to ensure that the table is "load" first, you need to ensure
that the table-side records have smaller timestamps than the stream-side
records.

-Matthias

Hieu Lam Tri <lamt...@gmail.com> 于2021年3月17日周三 上午11:17写道:
--
You received this message because you are subscribed to the Google Groups "ksqldb-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ksql-users+...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/ksql-users/6005a124-969e-44cf-a9bf-7759475e3ea6n%40googlegroups.com.


--
Ren Yang

Thanks & Regards 

Matthias J. Sax

unread,
Apr 29, 2021, 11:26:40 AM4/29/21
to ksql-...@googlegroups.com
Pre-processing the original input stream and creating a derived stream
with "shifted" timestamp, and use the derived stream in the stream-table
join should work.

Of course, you duplicate your storage footprint.

-Matthias
> Hieu Lam Tri <lamt...@gmail.com <mailto:lamt...@gmail.com>> 于2021年
> 3月17日周三 上午11:17写道:
>
> Hi, 
> I have a stream like this 
> Stream *Transaction* (key format Avro, value format Avro)
> TransactionId, date, UserId
> and table *Users* (key format Avro, value format Avro)
> Id, user_uuid
>
> I join stream Transaction and Users with 
>
> Create Stream Transaction_Join as 
>   select t.transactionId, t.date,u.id <http://u.id>,u.user_uuid
>   from Transaction t
>   left join Users u on t.userId = u.id <http://u.id>
>   emit changes;
>
> The number of records in transaction and the resulting stream is the
> same. That's is expected. 
> But the problem is that some of the record in Transaction_join has
> user_uuid of null value. I check the source stream and table and
> they all have valid values. 
> Is that the bug or anything that I am missing ? 
> Can I use the key format of avro or do I need to change it to
> default kafka for the join to work ? 
>
> Thanks 
>
> --
> You received this message because you are subscribed to the Google
> Groups "ksqldb-users" group.
> To unsubscribe from this group and stop receiving emails from it,
> send an email to ksql-users+...@googlegroups.com
> <mailto:ksql-users+...@googlegroups.com>.
> <https://groups.google.com/d/msgid/ksql-users/6005a124-969e-44cf-a9bf-7759475e3ea6n%40googlegroups.com?utm_medium=email&utm_source=footer>.
>
>
>
> --
> Ren Yang
>
> Thanks & Regards 
>
> --
> You received this message because you are subscribed to the Google
> Groups "ksqldb-users" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to ksql-users+...@googlegroups.com
> <mailto:ksql-users+...@googlegroups.com>.
> To view this discussion on the web, visit
> https://groups.google.com/d/msgid/ksql-users/CAHVYPQe9XbipS-7GYFh0dBL%3D9npLDTu%2BOQ36f_cEaF16q6U%3Dkw%40mail.gmail.com
> <https://groups.google.com/d/msgid/ksql-users/CAHVYPQe9XbipS-7GYFh0dBL%3D9npLDTu%2BOQ36f_cEaF16q6U%3Dkw%40mail.gmail.com?utm_medium=email&utm_source=footer>.
Reply all
Reply to author
Forward
0 new messages