Pushdown filter in case of Delta streaming

244 views
Skip to first unread message

amit joshi

unread,
Feb 23, 2021, 10:56:30 PM2/23/21
to Delta Lake Users and Developers
Hi team,

I have a use case where we need to stream a Delta table into multiple queries, filtered on one of the partitioned column.
Eg,.
Delta-table partitioned on year column.
spark.readStream.format("delta").load("/tmp/delta-table/").
where("year= 2013").explain()

The physical plan shows filter after the streaming.

My question is does pushdown predicate works with Streaming queries in Delta?
I mean streaming only specific partition from the Delta.


Regards
Amit Joshi

Shixiong(Ryan) Zhu

unread,
Feb 24, 2021, 12:06:29 AM2/24/21
to amit joshi, Delta Lake Users and Developers
Hey Amit,

If you are using OSS Delta, the answer is no. Delta streaming source is using the Data Source V1 API which doesn't support filter push down.
If you are using Databricks Runtime, the answer is yes. We made custom changes in our Spark fork to identify the Delta streaming source and push down filters for Delta. However, the change cannot be pushed to open source Apache Spark because the open source project won't accept customized changes for a special data source.

The good news is the Data Source V2 API for Structured Streaming supports filter push down. Once we migrate Delta streaming source to use V2 API, it will support the filter pushdown.  

Best Regards,
Ryan


--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/f1016d54-2e6f-4aa9-87b9-23c8340aa5d2n%40googlegroups.com.

amit joshi

unread,
Feb 24, 2021, 12:46:08 AM2/24/21
to Shixiong(Ryan) Zhu, Delta Lake Users and Developers
Hi Shixiong,

Thanks for your quick response,
Yes you are correct, I am using OSS Delta.

Can you please help me the timeline when the Delta Source V2 API be available in OSS?
And if you could also suggest the way to handle the use case currently. Particularly streaming multiple queries from the same Delta table with different partition?


Regards
Amit

Kumud Kumar Srivatsava Tirupati

unread,
Feb 21, 2023, 3:25:13 PM2/21/23
to Delta Lake Users and Developers
Hi
Is this supported in OSS delta now? I am just trying to run explain on my streaming query and got this output with OSS delta version 2.2.0. Does this signify pruning?

```
spark.readStream.format("delta").option("startingVersion", 0).load(inputPath).where("source_db = 'db1' AND source_table = 'table1'") .explain(extended = true)

== Parsed Logical Plan ==                                                       

'Filter (('source_db = db1) AND ('source_table = table1))

+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6b31915f,delta,List(),None,List(),None,Map(startingVersion -> 0, path -> s3a://hs-dev-dl/poc/delta/staging_test_ws),None), delta, [id#302, source_db#303, source_table#304, data#305]


== Analyzed Logical Plan ==

id: int, source_db: string, source_table: string, data: string

Filter ((source_db#303 = db1) AND (source_table#304 = table1))

+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6b31915f,delta,List(),None,List(),None,Map(startingVersion -> 0, path -> s3a://hs-dev-dl/poc/delta/staging_test_ws),None), delta, [id#302, source_db#303, source_table#304, data#305]


== Optimized Logical Plan ==

Filter ((isnotnull(source_db#303) AND isnotnull(source_table#304)) AND ((source_db#303 = db1) AND (source_table#304 = table1)))

+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@6b31915f,delta,List(),None,List(),None,Map(startingVersion -> 0, path -> s3a://hs-dev-dl/poc/delta/staging_test_ws),None), delta, [id#302, source_db#303, source_table#304, data#305]


== Physical Plan ==

*(1) Filter ((isnotnull(source_db#303) AND isnotnull(source_table#304)) AND ((source_db#303 = db1) AND (source_table#304 = table1)))

+- StreamingRelation delta, [id#302, source_db#303, source_table#304, data#305]

```
Reply all
Reply to author
Forward
0 new messages