Delta lake merge throws a warning when schema is large.

1,440 views
Skip to first unread message

Krishna Raju Kunadharaju

unread,
Jul 7, 2021, 3:27:46 PM7/7/21
to Delta Lake Users and Developers
I am using spark structured streaming to read from kafka and writing to delta using a batch writer.  My batch writer is very similar to the example provided in the docs - https://docs.delta.io/latest/delta-update.html#upsert-from-streaming-queries-using-foreachbatch

When the schema of my data becomes large, I get the following warning every time the batch writer is triggered to upsert into delta.

WARN DAGScheduler: Broadcasting large task binary with size 1046.1 KiB

Note that if I delete some fields from the schema the warning disappears (points to a issue in schema handling in merge rather than a issue with data). On some inspection of the Spark SQL tab in the UI, I see the following in logical plan of delta merge builder operation that might be the issue -

SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........) validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
:

The validate external type appears at least 400 times for the same object. Not sure why the same thing is being serialized so many times for a merge. Would appreciate some help on this!


Krishna

Jacek Laskowski

unread,
Jul 8, 2021, 6:41:40 AM7/8/21
to Krishna Raju Kunadharaju, Delta Lake Users and Developers
Hi,

Can you show the code? I think this SerializeFromObject is due to casting DataFrame to Dataset (which could be unnecessary).

--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/bb84402f-9a9f-4049-a762-b52c5ab7f1een%40googlegroups.com.

Krishna Raju Kunadharaju

unread,
Jul 8, 2021, 2:00:29 PM7/8/21
to Delta Lake Users and Developers
Sure, I have posted the code below for the batch writer. Note, I have made the field/column names generic.

-------------------------------------------------------------------------------
def upsert_to_delta(self, micro_batch_output_df, batch_id):


        spark = SparkSession.builder.getOrCreate()
        
         # The next two steps just drop duplicates.

        window = Window.partitionBy(["field1","field2"])
  
        df = micro_batch_output_df.withColumn("maxField", F.max("someField")\
                .over(window))\
                .where(F.col('someField') == F.col('maxField'))\
                .drop('maxField')

        partition_column_1_list = [x[0] for x in df.select('partition_column_1').distinct().collect()]
        partition_column_2_list = [x[0] for x in df.select(partition_column_2').distinct().collect()]
       
        partition_1_string = ','.join("'{}'".format(x) for x in partition_column_1_list)        
         partition_1_string   = ','.join("'{}'".format(x) for x in partition_column_2_list)

        if DeltaTable.isDeltaTable(spark, table):
            deltaTable = DeltaTable.forPath(spark, table_location)
            deltaTable.alias("t")\
            .merge(broadcast(df.alias("s"))," t.partition_column_1 IN ({}) AND t.partition_column_2 IN ({}) AND s.some_field = t.some_field AND s.some_field_2 = t.some_field_2".format(partition_1_string , partition_2_string ))\
            .whenMatchedUpdateAll("s.someField > t.someField")\
            .whenNotMatchedInsertAll()\
            .execute()
        else:
            df.write.format("delta").partitionBy("partition_column_1","partition_column_2").save(table_location)
---------------------------------------------------------------------------------------

Shixiong(Ryan) Zhu

unread,
Jul 13, 2021, 2:42:25 AM7/13/21
to Krishna Raju Kunadharaju, Delta Lake Users and Developers
Hey Krishna,

Could you provide the spark logs? My hunch is this may be related to the window expression you are using.

Best Regards,

Ryan


Krishna Raju Kunadharaju

unread,
Jul 15, 2021, 6:31:35 PM7/15/21
to Delta Lake Users and Developers
Unfortunately, this happens even with the window condition removed. It is somehow tied to the schema, as this only happens if the schema of the data in increased with additional columns that can be nullable.

Some part of the logs, that repeats more than a 100 times continuously  is as follows -

21/07/15 15:05:18 INFO CodecConfig: Compression: SNAPPY
21/07/15 15:05:18 INFO CodecConfig: Compression: SNAPPY
21/07/15 15:05:18 INFO ParquetOutputFormat: Parquet block size to 134217728
21/07/15 15:05:18 INFO ParquetOutputFormat: Parquet page size to 1048576
21/07/15 15:05:18 INFO ParquetOutputFormat: Parquet dictionary page size to 1048576
21/07/15 15:05:18 INFO ParquetOutputFormat: Dictionary is on
21/07/15 15:05:18 INFO ParquetOutputFormat: Validation is off
21/07/15 15:05:18 INFO ParquetOutputFormat: Writer version is: PARQUET_1_0
21/07/15 15:05:18 INFO ParquetOutputFormat: Maximum row group padding size is 8388608 bytes
21/07/15 15:05:18 INFO ParquetOutputFormat: Page size checking is: estimated
21/07/15 15:05:18 INFO ParquetOutputFormat: Min row count for page size check is: 100
21/07/15 15:05:18 INFO ParquetOutputFormat: Max row count for page size check is: 10000
21/07/15 15:05:18 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
Schema here

Wondering if this is any issue here.

Krishna

Krishna Raju Kunadharaju

unread,
Jul 15, 2021, 6:46:52 PM7/15/21
to Delta Lake Users and Developers
This also just seems to be "just a warning" - https://github.com/apache/spark/blob/b5a15035851bfba12ef1c68d10103cec42cbac0c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1424.

But still have no real clue, as to what could be the reason.

Krishna

Reply all
Reply to author
Forward
0 new messages