How to solve duplicated kafka messages and aggregation?

279 views
Skip to first unread message

kriticar

unread,
Nov 13, 2019, 8:10:16 AM11/13/19
to ClickHouse
Hi,

we are using kafka tables - > materialized view -> final_table mechanism for ingesting the data.

Looks like kafka engine is not reliable so we are facing the issue in which single kafka message is written twice (or more than twice) in final_table.

There is also agg_view that aggregates data from final table.

CREATE MATERIALIZED VIEW agg_view
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{layer}-{shard}/...
as select ... from final_table

How to deal with duplicates in final_table in order to aggregate only single copy of each row?

I hoped that I will be able to use collapsing engine for final_table, but than I have a problem that aggregating merge tree cannot use final keyword.

CREATE MATERIALIZED VIEW agg_view
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{layer}-{shard}/...
as select ... from final_table FINAL


"
SQL Error [397]: ClickHouse exception, code: 397, host: localhost, port: 8123; Code: 397, e.displayText() = DB::Exception: There was an error on [node01:9000]: Code: 397, e.displayText() = DB::Exception: MATERIALIZED VIEW cannot have PREWHERE, SAMPLE or FINAL. (version 19.16.3.6) (version 19.16.3.6)
"

How to aggregate if there are duplicates?

Regards,

Zoran

Denis Zhuravlev

unread,
Nov 13, 2019, 9:31:42 AM11/13/19
to ClickHouse
MV never reads real table. MV is an insert trigger which gets push from INSERT command with inserted buffer.

You can remove duplicates using SQL from inserted buffer. But actually it will not help you because duplicates always will be in different buffers.

My advice is "do not use Kafka engine". https://github.com/housepower/clickhouse_sinker

kriticar

unread,
Nov 14, 2019, 2:51:57 AM11/14/19
to ClickHouse
I don't want to introduce a non clickhouse component in order to reduce complexity.
For now this is my idea.

Kafka table -> MV > Collapsing table

cron job that will move data from Collapsing Table to MergeTree table with delay.

In this case, cron will check what is last date that has been transferred, optimize partitions that has not been transfered up to delay, and transfer optimized partitions to MergeTree table.

Whole process is only because Kafka table is not reliable considering duplicate messages.
I don't have a better option for now.

Regards.

Denis Zhuravlev

unread,
Nov 14, 2019, 8:54:09 AM11/14/19
to ClickHouse
It could work for a small data.

kriticar

unread,
Nov 22, 2019, 7:29:22 AM11/22/19
to ClickHouse
I am doing it for 1 billion (not million) of daily data.
Looks like it is working.
Reply all
Reply to author
Forward
0 new messages