I have a Kinesis stream dump a JSON every 10 min or so. I stream these files into a Delta table used for staging as follows:
spark.readStream.load(inputPath, format='json', schema=kinesis_schema).createOrReplaceTempView('kinesis_tbl')
kinesis = spark.sql("\
select \
kinesis_record_id, \
kinesis_record_type, \
kinesis_source, \
kinesis_timestamp, \
kinesis_record.* \
from kinesis_tbl")
kinesis.writeStream.format("delta").outputMode("append").option("checkpointLocation", "/delta/staging.kinesis/_checkpoints/etl-from-s3").table("staging.kinesis")
Because of data quality issues it is possible to get duplicates from the stream and to eliminate these duplicates I need to do streaming upsert from this staging Delta table to a production Delta table. In SQL it would look like this:
merge into production.kinesis as t
using staging.kinesis as s
on
t.kinesis_source = s.kinesis_source and
t.kinesis_record_id = s.kinesis_record_id
when not matched
then insert (
kinesis_record_id,
kinesis_record_type,
kinesis_source,
kinesis_timestamp)
values (
kinesis_record_id,
kinesis_record_type,
kinesis_source,
kinesis_timestamp)
The issue (to the best of my knowledge) is that I can't get a SQL merge/upsert to work as a stream. I can get it to work as a batch job exactly as expected. I tried the following in Pyspark but I get
spark.readStream.table('staging.kinesis').createOrReplaceTempView("kinesis_stream")
query = spark.sql("\
merge into production.kinesis as t \
using kinesis_stream as s \
on \
t.kinesis_source = s.kinesis_source and \
t.kinesis_record_id = s.kinesis_record_id \
when not matched \
then insert (\
kinesis_record_id, \
kinesis_record_type, \
kinesis_source, \
kinesis_timestamp) \
values (\
kinesis_record_id, \
kinesis_record_type, \
kinesis_source, \
kinesis_timestamp)")
I get the following error:
AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
I've experimented with putting the writeStream.start() command in different places but no matter what I keep getting the same error. I'm unsure how to make this work. At the end of the day I just want a streaming upsert from one Delta table to another, and I care less about how it actually happens so I'm flexible on solutions.