Thanks for reply. I put code here for convenience and will go to mail list as well. Below is the sample code to reproduce two issues:
1. "HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system...." error while writing to s3a path even though the parquet files are written
2. Delta-io lose the schema enforce while string type is converted to float type
3. Delta-io has schema enforce if writing to regular local path
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.0.0
/_/
Using Python version 3.7.0 (default, Jun 28 2018 13:15:42)
SparkSession available as 'spark'.
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("MyApp") \
... .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
... .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
... .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
... .config("spark.delta.logStore.class","org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")\
... .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.234") \
... .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.8.5") \
... .config("spark.jars.packages", "org.apache.httpcomponents:httpclient:4.5.3") \
... .config("spark.jars.packages", "com.amazonaws:aws-java-sdk:1.11.234") \
... .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-kms:1.11.234") \
... .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-core:1.11.234") \
... .config("spark.jars.packages", "com.amazonaws:aws-java-sdk-s3:1.11.234") \
... .config("spark.jars.packages", "joda-time:joda-time:2.9.9") \
... .config("fs.s3a.access.key","minioadmin")\
... .config("fs.s3a.secret.key","minioadmin")\
... .config("fs.s3a.path.style.access", True)\
... .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
... .getOrCreate()
>>>
>>> columns = ["language","users_count"]
>>> data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
>>> rdd = spark.sparkContext.parallelize(data)
>>> dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
>>> dfFromRDD2
DataFrame[language: string, users_count: string]
>>> dfFromRDD2.show()
+--------+-----------+
|language|users_count|
+--------+-----------+
| Java| 20000|
| Python| 100000|
| Scala| 3000|
+--------+-----------+
>>> dfFromRDD2.printSchema()
root
|-- language: string (nullable = true)
|-- users_count: string (nullable = true)
>>> path = 's3a://security/test'
>>> dfFromRDD2.write.format("delta").save(path)
# Here the parquet files are writen but with error messages below:
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:327)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:830)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 827, in save
self._jwrite.save(path)
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 131, in deco
return f(*a, **kw)
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o294.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:216)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:327)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
... 66 more
>>>
# change the column from string to float
# new parquet files are written with same exception messages above
>>> from pyspark.sql.functions import col
>>> from pyspark.sql.types import *
>>> dfFromRDD2 = dfFromRDD2.withColumn("users_count",col("users_count").cast(FloatType()))
>>> dfFromRDD2.printSchema()
root
|-- language: string (nullable = true)
|-- users_count: float (nullable = true)
>>> dfFromRDD2.write.format("delta").mode("overwrite").save(path)
21/04/22 10:03:30 ERROR HDFSLogStore: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:374)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:830)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 827, in save
self._jwrite.save(path)
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 131, in deco
return f(*a, **kw)
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o170.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
In order to get the transactional ACID guarantees on table updates, you have to use the
correct implementation of LogStore that is appropriate for your storage system.
at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:216)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:628)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:68)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:148)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:374)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3a.impl=null: No AbstractFileSystem configured for scheme: s3a
at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
... 66 more
#But if path is NOT s3a as above but regular path
# change the type will cause delta io to enforce schema as expected
>>> path
'/tmp/delta_test'
>>> dfFromRDD2.write.format("delta").mode("overwrite").save(path)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 827, in save
self._jwrite.save(path)
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/home/nshen/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 137, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: Failed to merge fields 'users_count' and 'users_count'. Failed to merge incompatible data types StringType and FloatType;;