I am using spark structured streaming to read from kafka and writing to delta using a batch writer.
My batch writer is very similar to the example provided in the docs -
https://docs.delta.io/latest/delta-update.html#upsert-from-streaming-queries-using-foreachbatch.
When the schema of my data becomes large, I get the following warning every time the batch writer is triggered to upsert into delta.
WARN DAGScheduler: Broadcasting large task binary with size 1046.1 KiB
Note that if I delete some fields from the schema the warning disappears (points to a issue in schema handling in merge rather than a issue with data). On some inspection of the Spark SQL tab in the UI, I see the following in logical plan of delta merge builder operation that might be the issue -
SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........) validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @timestamp), TimestampType), true, false) AS @timestamp#3693456........)
:
The validate external type appears at least 400 times for the same object. Not sure why the same thing is being serialized so many times for a merge. Would appreciate some help on this!
Krishna