Streaming Upserts with Delta as both Source and Sink

1,182 views
Skip to first unread message

nichola...@gmail.com

unread,
Nov 3, 2020, 4:46:15 PM11/3/20
to Delta Lake Users and Developers
I have a Kinesis stream dump a JSON every 10 min or so. I stream these files into a Delta table used for staging as follows: 

spark.readStream.load(inputPath, format='json', schema=kinesis_schema).createOrReplaceTempView('kinesis_tbl')

kinesis = spark.sql("\
select \
  kinesis_record_id, \
  kinesis_record_type, \
  kinesis_source, \
  kinesis_timestamp, \
  kinesis_record.* \
  from kinesis_tbl")

kinesis.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/delta/staging.kinesis/_checkpoints/etl-from-s3").table("staging.kinesis")

Because of data quality issues it is possible to get duplicates from the stream and to eliminate these duplicates I need to do streaming upsert from this staging Delta table to a production Delta table. In SQL it would look like this: 

merge into production.kinesis as t
using staging.kinesis as s 
on 
t.kinesis_source = s.kinesis_source and 
t.kinesis_record_id = s.kinesis_record_id
when not matched 
then insert (
kinesis_record_id, 
kinesis_record_type, 
kinesis_source, 
kinesis_timestamp) 
values (
kinesis_record_id, 
kinesis_record_type, 
kinesis_source, 
kinesis_timestamp)

The issue (to the best of my knowledge) is that I can't get a SQL merge/upsert to work as a stream. I can get it to work as a batch job exactly as expected. I tried the following in Pyspark but I get 

spark.readStream.table('staging.kinesis').createOrReplaceTempView("kinesis_stream")

query = spark.sql("\
merge into production.kinesis as t \
using kinesis_stream as s \
on \
t.kinesis_source = s.kinesis_source and \
t.kinesis_record_id = s.kinesis_record_id \
when not matched \
then insert (\
kinesis_record_id, \
kinesis_record_type, \
kinesis_source, \
kinesis_timestamp) \
values (\
kinesis_record_id, \
kinesis_record_type, \
kinesis_source, \
kinesis_timestamp)")

I get the following error: 

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

I've experimented with putting the writeStream.start() command in different places but no matter what I keep getting the same error. I'm unsure how to make this work. At the end of the day I just want a streaming upsert from one Delta table to another, and I care less about how it actually happens so I'm flexible on solutions. 

Chris Hoshino-Fish

unread,
Nov 3, 2020, 6:25:13 PM11/3/20
to nichola...@gmail.com, Delta Lake Users and Developers
Hi Nicholas,

You can find the docs here: https://docs.delta.io/0.7.0/delta-update.html#upsert-from-streaming-queries-using-foreachbatch which show how to do Streaming UPSERTs

Let us know if there are any issues!

-Chris

--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/870a3cff-e269-44b8-9222-3a62b2389591n%40googlegroups.com.


--

Chris Hoshino-Fish

Sr. Solutions Architect

Databricks Inc.

fi...@databricks.com

(415) 610-8520

nichola...@gmail.com

unread,
Nov 4, 2020, 12:34:13 PM11/4/20
to Delta Lake Users and Developers
Thanks Chris, 

I did come across that post and I had two issues with it. First, I couldn't figure out how to get my Delta files location on S3 properly referenced properly. The parquet files are located at: 

databricks/oregon-prod/<some number that could be unique to me>/user/hive/warehouse/staging.db/kinesis/

Furthermore, and perhaps I don't understand your example, but I'm merely passing the data through and not aggregating. I'm not sure what streamingAggregatesDF is. 

Thanks in advance. 

nichola...@gmail.com

unread,
Nov 6, 2020, 9:18:34 AM11/6/20
to Delta Lake Users and Developers
I can't figure out how to translate the example to my use case. It feels like given how easy most things are with streaming in Spark that this use case (streaming upsert with Delta tables as source and sink) should be easier, which make me feel like I'm missing something. 

Shixiong(Ryan) Zhu

unread,
Nov 6, 2020, 12:42:57 PM11/6/20
to nichola...@gmail.com, Delta Lake Users and Developers
The document is an example foreachBatch with aggregation. You can also use foreachBatch without aggregation like this:

def upsertToDelta(microBatchOutputDF, batchId):
  microBatchOutputDF.createOrReplaceTempView("kinesis_stream")


  spark.sql("\
    merge into production.kinesis as t \
    using kinesis_stream as s \
    on \
 t.kinesis_source = s.kinesis_source and \
 t.kinesis_record_id = s.kinesis_record_id \
   when not matched \
   then insert (\
     kinesis_record_id, \
     kinesis_record_type, \
     kinesis_source, \
     kinesis_timestamp) \
   values (\
     kinesis_record_id, \
     kinesis_record_type, \
     kinesis_source, \
     kinesis_timestamp)")
}

query = spark.readStream.table('staging.kinesis') \
  .writeStream \
  .foreachBatch(upsertToDelta) \
  .outputMode("append") \
  .start()



nichola...@gmail.com

unread,
Nov 9, 2020, 10:01:04 AM11/9/20
to Delta Lake Users and Developers
Thanks Ryan, this makes more sense but I do get a syntax error based on the unopened {

I tried to experiment to put the { but didn't have any success. 

Screen Shot 2020-11-09 at 9.58.46 AM.png

Shixiong(Ryan) Zhu

unread,
Nov 9, 2020, 12:14:10 PM11/9/20
to nichola...@gmail.com, Delta Lake Users and Developers
You can remove `}` at line 24. I missed it when modifying codes from Scala to Python.

Best Regards,

Ryan


Nicholas Sharkey

unread,
Nov 9, 2020, 12:50:36 PM11/9/20
to Shixiong(Ryan) Zhu, Delta Lake Users and Developers
Thanks. The stream now initializes but fails. The code I execute: 

def upsertToDelta(microBatchOutputDF, batchId):
  microBatchOutputDF.createOrReplaceTempView("kinesis_stream")


  spark.sql("\
    merge into production.kinesis as t \
    using kinesis_stream as s \
    on \
 t.kinesis_source = s.kinesis_source and \
 t.kinesis_record_id = s.kinesis_record_id \
   when not matched \
   then insert (\
     kinesis_record_id, \
     kinesis_record_type, \
     kinesis_source, \
     kinesis_timestamp) \
   values (\
     kinesis_record_id, \
     kinesis_record_type, \
     kinesis_source, \
     kinesis_timestamp)")

query = spark.readStream.table('staging.kinesis') \
  .writeStream \
  .foreachBatch(upsertToDelta) \
  .outputMode("append") \
  .start()


The error: 


py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/databricks/spark/python/pyspark/sql/utils.py", line 206, in call raise e File "/databricks/spark/python/pyspark/sql/utils.py", line 203, in call self.func(DataFrame(jdf, self.sql_ctx), batch_id) File "<command-1830613532693125>", line 21, in upsertToDelta kinesis_timestamp)") File "/databricks/spark/python/pyspark/sql/session.py", line 709, in sql return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped) File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/databricks/spark/python/pyspark/sql/utils.py", line 133, in deco raise_from(converted) File "<string>", line 3, in raise_from pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; tahoe at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at com.sun.proxy.$Proxy105.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources
Reply all
Reply to author
Forward
0 new messages