Delta table MergeSchema option

691 views
Skip to first unread message

Canan Girgin

unread,
Dec 2, 2021, 2:10:42 AM12/2/21
to delta...@googlegroups.com


Hi,

I use delta tables for 2 months. I have a delta table on Databricks. 
In my pipeline, I add data from parquet files to this delta table. Some new columns could be added to parquet files day by day. I also would like to add these new columns to my delta table. You can find the related code block below and my spark version is 3.1.2. 
It completes successfully without error and inserts data to my delta table but without new columns. 



val confMergeSchema = "true"
val confDatabricksAutoMergeSchema = "true"

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled" , confDatabricksAutoMergeSchema)

spark.read
  .format("parquet")
  .load(pathRead)
  .repartition(numFiles)
  .write
  .format("delta")
  .option("mergeSchema", confMergeSchema)
  .mode("append")
  .option("path", pathWrite)
  .partitionBy("a","b","c")
 .saveAsTable(s"$dbName.$tableName")

 When I change code like below and add mergeSchema option after read it throws exception:  "Found duplicate column(s) in the data schema, "

spark.read

  .option("mergeSchema", confMergeSchema)
  .format("parquet")
  .load(pathRead)
  .repartition(numFiles)
  .write
  .format("delta")
  .mode("append")
  .option("path", pathWrite)
  .partitionBy("a","b","c")
  .saveAsTable(s"$dbName.$tableName")

What is my missing point?

Regards.
Canan

 

Canan Girgin

unread,
Dec 2, 2021, 5:28:41 AM12/2/21
to delta...@googlegroups.com
I have resolved my issue:)
I need to use ("mergeSchema", "true") option when reading the parquet otherwise it only gets common fields.

Another issue is that parquet is case-sensitive but spark is not. When I get the whole schema from parquet files the correct way I see columns like both user_ID and user_id in dataFrame. 
These columns are not duplicate for a dataframe schema but it does not work for a table schema. 

My solution is to make all column names to lowercase:

spark.conf.set("spark.sql.caseSensitive","true")
val dfLoaded=spark
  .read
  .option("mergeSchema", "true")

  .format("parquet")
  .load(pathRead)
  .repartition(numFiles)

val merged = dfLoaded.columns.groupBy(_.toLowerCase)
                   .map(t => coalesce(t._2.map(col):_*).as(t._1))
                   .toArray

dfLoaded
  .select(merged:_*)
  .write
  .format("delta")
  .option("mergeSchema", "true")

  .mode("append")
  .option("path", pathWrite)
  .partitionBy("a","b","c")
  .saveAsTable(s"$dbName.$tableName")

spark.conf.set("spark.sql.caseSensitive","false")

Regards.
Canan
Reply all
Reply to author
Forward
0 new messages