Change Data Feed Streaming - reading from latest

36 views
Skip to first unread message

kowndinya vankayalapati

unread,
Apr 9, 2025, 11:08:35 AMApr 9
to Delta Lake Users and Developers
Hi,

I am facing a challenge in reading the change data feed of a delta table. We have a job that reads the CDF stream and does some processing to write to another table.

Normally, things work as expected - but for the very first time, when we start our job, we don't want to read any old changes. According to documentation, if we don't provide "startingVersion", it should read from now. But what we are seeing is, even if "startingVersion" is not provided, it starts reading from the latest commit version. Essentially we are ending up reading at the least one version. 

Is it possible to avoid this? Tried with "startingTimestamp" and if I give some time greater than the latest commit, it throws below exception

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211) Caused by: org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_TIMESTAMP_GREATER_THAN_COMMIT] The provided timestamp (2025-04-09 18:20:00.0) is after the latest version available to this table (2025-04-03 04:59:37.0). Please use a timestamp before or at 2025-04-03 04:59:37. at org.apache.spark.sql.delta.DeltaErrorsBase.timestampGreaterThanLatestCommit(DeltaErrors.scala:1367) at org.apache.spark.sql.delta.DeltaErrorsBase.timestampGreaterThanLatestCommit$(DeltaErrors.scala:1363) at org.apache.spark.sql.delta.DeltaErrors$.timestampGreaterThanLatestCommit(DeltaErrors.scala:3382) at org.apache.spark.sql.delta.sources.DeltaSource$.getStartingVersionFromTimestamp(DeltaSource.scala:1379) at org.apache.spark.sql.delta.sources.DeltaSource.getStartingVersion$lzycompute(DeltaSource.scala:1325) at org.apache.spark.sql.delta.sources.DeltaSource.getStartingVersion(DeltaSource.scala:1298) at org.apache.spark.sql.delta.sources.DeltaSource.getStartingOffset(DeltaSource.scala:873) at org.apache.spark.sql.delta.sources.DeltaSource.$anonfun$latestOffsetInternal$2(DeltaSource.scala:907) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.delta.sources.DeltaSource.latestOffsetInternal(DeltaSource.scala:907) at org.apache.spark.s

Any suggestions on how to ensure that we don't read any version and only pick the changes afterwards?

Thanks,
Kowndinya
Reply all
Reply to author
Forward
0 new messages