delta 0.8.0 with minio S3 bucket write exception and question about schema enforcement

390 views
Skip to first unread message

Regin Quinoa

unread,
Apr 22, 2021, 11:31:27 AM4/22/21
to Delta Lake Users and Developers
Thanks for reply. I put code here for convenience and will go to mail list as well. Below is the sample code to reproduce two issues:
1. "HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system...." error while writing to s3a path even though the parquet files are written
2. Delta-io lose the schema enforce while string type is converted to float type
3. Delta-io has schema enforce if writing to regular local path
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/

Using Python version 3.7.0 (default, Jun 28 2018 13:15:42)
SparkSession available as 'spark'.
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("MyApp") \
...     .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
...     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
...     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
...     .config("spark.delta.logStore.class","org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
...     .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.234") \
...     .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.8.5") \
...     .config("spark.jars.packages", "org.apache.httpcomponents:httpclient:4.5.3") \
...     .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.234") \
...     .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-kms:1.11.234") \
...     .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-core:1.11.234") \
...     .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-s3:1.11.234") \
...     .config("spark.jars.packages", "joda-time:joda-time:2.9.9") \
...     .config("fs.s3a.endpoint", "http://10.231.124.130:9000") \
...     .config("fs.s3a.access.key","minioadmin")\
...     .config("fs.s3a.secret.key","minioadmin")\
...     .config("fs.s3a.path.style.access", True)\
...     .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
...     .getOrCreate()
>>>
>>> columns = ["language","users_count"]
>>> data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
>>> rdd = spark.sparkContext.parallelize(data)
>>> dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
>>> dfFromRDD2
DataFrame[language: string, users_count: string]
>>> dfFromRDD2.show()
+--------+-----------+
|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

>>> dfFromRDD2.printSchema()
root
 |-- language: string (nullable = true)
 |-- users_count: string (nullable = true)

>>> path = 's3a://security/test'
>>> dfFromRDD2.write.format("delta").save(path)

# Here the parquet files are writen but with error messages below: 

 is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
 In order to get the transactional ACID guarantees on table updates, you have to use the
 correct implementation of LogStore that is appropriate for your storage system.

org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
        at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
        at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
        at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
        at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
        at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:327)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:830)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 827, in save
    self._jwrite.save(path)
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 131, in deco
    return f(*a, **kw)
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o294.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
 is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
 In order to get the transactional ACID guarantees on table updates, you have to use the
 correct implementation of LogStore that is appropriate for your storage system.

        at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:216)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
        at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:327)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
        at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
        at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
        at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
        at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
        ... 66 more

>>>


         

# change the column from string to float
# new parquet files are written with same exception messages above

>>> from pyspark.sql.functions import col
>>> from pyspark.sql.types import *
>>> dfFromRDD2 = dfFromRDD2.withColumn("users_count",col("users_count").cast(FloatType()))
>>> dfFromRDD2.printSchema()
root
 |-- language: string (nullable = true)
 |-- users_count: float (nullable = true)
>>> dfFromRDD2.write.format("delta").mode("overwrite").save(path)

21/04/22 10:03:30 ERROR HDFSLogStore: The error typically occurs when the default LogStore implementation, that
 is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
 In order to get the transactional ACID guarantees on table updates, you have to use the
 correct implementation of LogStore that is appropriate for your storage system.

org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
        at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
        at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
        at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
        at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
        at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:374)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:830)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 827, in save
    self._jwrite.save(path)
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 131, in deco
    return f(*a, **kw)
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o170.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
 is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
 In order to get the transactional ACID guarantees on table updates, you have to use the
 correct implementation of LogStore that is appropriate for your storage system.

        at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:216)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
        at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
        at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
        at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
        at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
        at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
        at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:374)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:567)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
        at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
        at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
        at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
        at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
        at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
        at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
        at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
        ... 66 more

#But if path is NOT s3a as above but regular path
# change the type will cause delta io to enforce schema as expected

>>> path
'/tmp/delta_test'

>>> dfFromRDD2.write.format("delta").mode("overwrite").save(path)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 827, in save
    self._jwrite.save(path)
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 137, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: Failed to merge fields 'users_count' and 'users_count'. Failed to merge incompatible data types StringType and FloatType;;

Regin Quinoa

unread,
Apr 26, 2021, 11:28:41 AM4/26/21
to Delta Lake Users and Developers
Finding this post about delta lake 0.1.0 about S3 about two years ago.  Wanna to know what's the status about the 0.8.0 right now? Schema support or transactional support? 

Given how popular s3 right now, its support kind of a deal to consider. It would be appreciated
Thanks for the input

R Tyler Croy

unread,
Apr 26, 2021, 4:23:27 PM4/26/21
to Regin Quinoa, Delta Lake Users and Developers
(replies inline)



‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐
On Monday, April 26, 2021 8:28 AM, Regin Quinoa <swea...@gmail.com> wrote:

Finding this post about delta lake 0.1.0 about S3 about two years ago.  Wanna to know what's the status about the 0.8.0 right now? Schema support or transactional support? 

Given how popular s3 right now, its support kind of a deal to consider. It would be appreciated

Delta Lake supports S3 as a first-class storage backend and many, myself included are using it in production. I also do a fair bit of local development against Minio specifically for S3-compatibility testing, and can vouch for it there as well :)


The only thing to consider when using S3 as a backend is that concurrent writers to the same exact Delta table will need some external coordination because S3 lacks a specific primitive (the name escapes me right now) for providing truly atomic writes there.


Otherwise, it works great :)

signature.asc

Regin Quinoa

unread,
Apr 27, 2021, 11:01:00 PM4/27/21
to R Tyler Croy, Delta Lake Users and Developers
I have issues of delta 0.8.0 with minio S3 bucket write exception and question about schema enforcement 
Would you share some snippet examples for versioning of Delta Lake with S3 ? 
Thanks for replying.
Reply all
Reply to author
Forward
0 new messages