Error while creating ksql stream and table

19 views
Skip to first unread message

Shivam Bansal

unread,
Aug 10, 2023, 2:36:08 AM8/10/23
to ksqldb-users

Hi team,

I’ve just started working on KsqlDB for real-time streaming.

I’m using Opensource code to setup the KsqlDB. I’m trying to setup the cluster of 3 worker node and using below queries to create ksql stream and table.

create stream ksql_deposit_table_final_stream_prod ( userID bigint, txnInitiatedTime timestamp, Amount decimal(19, 4), status int, lastUpdateTime timestamp) WITH (kafka_topic='JWR-SEC-Prod-JWR-DepositTransactions-v4.dbo.DepositTransactions' ,timestamp='lastUpdateTime',partitions=1,value_format='avro');

create table KSQL_DEPOSIT_USERS_AVG_10MINS_PROD as select userID, AS_VALUE(userID) as user_id, substring(TIMESTAMPTOSTRING(WINDOWEND, 'yyy-MM-dd HH:mm:ss'), 1, 10) as dt, substring(TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd HH:mm:ss'), 12, 2) as hour , round(avg(coalesce(cast(AMOUNT as int), 0)), 2) AS avg_amt, count(userID) as cnt_users, sum(amount) as amt from ksql_deposit_table_final_stream_prod WINDOW TUMBLING (SIZE 10 MINUTE, GRACE PERIOD 10 MINUTE) where status = 2 group by userID;

And the data is present in kafka but when i am trying to run these queries then getting the below error.

[2023-08-08 09:15:12,021] INFO stream-thread [_confluent-ksql-demo_02_query_CTAS_KSQL_DEPOSIT_USERS_AVG_30MINS_TMP_0-8ce568f2-0c02-4ba4-9042-b53aa41bed2f-StreamThread-2] Setting topic 'JRB-DB-SEC-ksql-testdeposit.dbo.DepositTransactions_np' to consume from earliest offset (org.apache.kafka.streams.processor.internals.StreamThread:1045)

[2023-08-08 09:15:12,022] INFO [Consumer clientId=_confluent-ksql-demo_02_query_CTAS_KSQL_DEPOSIT_USERS_AVG_30MINS_TMP_0-8ce568f2-0c02-4ba4-9042-b53aa41bed2f-StreamThread-2-consumer, groupId=_confluent-ksql-demo_02_query_CTAS_KSQL_DEPOSIT_USERS_AVG_30MINS_TMP_0] Seeking to earliest offset of partition JRB-DB-SEC-ksql-testdeposit.dbo.DepositTransactions_np-0 (org.apache.kafka.clients.consumer.internals.SubscriptionState:642)

[2023-08-08 09:15:12,033] INFO [Consumer clientId=_confluent-ksql-demo_02_query_CTAS_KSQL_DEPOSIT_USERS_AVG_30MINS_TMP_0-8ce568f2-0c02-4ba4-9042-b53aa41bed2f-StreamThread-2-consumer, groupId=_confluent-ksql-demo_02_query_CTAS_KSQL_DEPOSIT_USERS_AVG_30MINS_TMP_0] Resetting offset for partition JRB-DB-SEC-ksql-testdeposit.dbo.DepositTransactions_np-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[10.36.26.30:9092 (id: 2 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:399)

[2023-08-08 09:15:12,315] ERROR {"type":0,"deserializationError":{"target":"value","errorMessage":"Error deserializing message from topic: JRB-DB-SEC-ksql-testdeposit.dbo.DepositTransactions_np","recordB64":null,"cause":["Failed to deserialize data for topic JRB-DB-SEC-ksql-testdeposit.dbo.DepositTransactions_np to Avro: ","Error retrieving Avro value schema for id 0","No content with id/hash 'contentId-0' was found.; error code: 40403"],"topic":"JRB-DB-SEC-ksql-testdeposit.dbo.DepositTransactions_np"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CTAS_KSQL_DEPOSIT_USERS_AVG_30MINS_TMP_0.KsqlTopic.Source.deserializer:44)

Can anyone please help me to solve this error

Reply all
Reply to author
Forward
0 new messages