It works fine. I can see the files are created in the default spark-warehouse folder.
But Next time I just want to read the saved table. So I comment code for the first two septs and re-run the program I get
Analysis Exception:Table or view not found
val transHistory = spark. read .option("header", "true") .option("inferschema", true) .csv(InputPath + "trainHistory.csv");
transHistory.write.format("delta").mode(SaveMode.Overwrite).saveAsTable("transactionshistory")
val transHistoryTable = spark.read.format("delta").table("transactionshistory")
transHistoryTable.show(10)
Am I doing something wrong ? I am using delta lake 0.8.0, Spark 3.0, and scala 2.12.13
--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/812813d5-705d-4e9a-bcb9-c6654ee594d2n%40googlegroups.com.
Hi Jacek,
Following is the complete code
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object TableReadWrite {
val InputPath = "D:\\Work\\RND\\Spark\\examples\\RetailData\\"
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName("Delta lake example")
.getOrCreate()
val transHistory = spark.
read
.option("header", "true")
.option("inferschema", true)
.csv(InputPath + "trainHistory.csv");
//transHistory.write.format("delta").saveAsTable("transactionshistory")
val transTable = spark.table("transactionshistory")
transTable.show(10)
}
}
Following is the stack trace
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: transactionshistory;;
'UnresolvedRelation [transactionshistory]
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:106)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:92)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:89)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:130)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:156)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:153)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:68)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:58)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:584)
at org.apache.spark.sql.SparkSession.table(SparkSession.scala:580)
at TableReadWrite$.main(TableReadWrite.scala:22)
at TableReadWrite.main(TableReadWrite.scala)
Notice that the statement
//transHistory.write.format(“delta”).saveAsTable(“transactionshistory”)
If I uncomment this and remove transactionhistory folder from the default spark warehouse path then it works fine.
Meaning that when I am creating the table it gets the table in the catalog but if I am reading the existing table then it does not get the table. I have confirmed this with catalog APIs.
This is a very basic example and I am sure I am doing some silly mistakes or there is a gap in my understanding but I am not able to detect that.
Thanks for help
Regards,
Rajan
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/374680c9-317a-4494-90d2-a9d9ec1dd79an%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/CAB_G8Ztk_vpVD58Lbo5KgqmN6%3DjO9T0NnYhLe6jHacd3TTtNpw%40mail.gmail.com.
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/CAOWmjHTh0J0YL99%2BBKudEhZN62ccH5RMpnd28VaSqKpe4kWBAg%40mail.gmail.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/CAOWmjHTh0J0YL99%2BBKudEhZN62ccH5RMpnd28VaSqKpe4kWBAg%40mail.gmail.com.
Hi All,
Thanks for response
@ adamdec85 & hasejarohit : I am creating the table in the default database so I had skipped it even while creating and I see that the database is created in the default spark warehouse location i.e. \spark_warehouse. I even tried giving explicit database name i.e. default.transactionhistory but still got the same error while reading the existing table.
@jacek : I am running directly from intellij
@Rohit Haseja : Yes I am following the same link and I am not using delta while reading the table.
I tried to check if the database and table is present in the catalog with the following APIs
spark.catalog.listDatabases().show()
spark.catalog.listTables().show()
When the table is not there and I am creating a new one I see that the database name is default and it has a table named transactionshistory which is managed table and not temporary, However when I run it only to read the existing table then the database name I get as default but the table is not there.
So the question is why Spark does not load the table.
Regards,
Rajan
CREATE TABLE events USING DELTA LOCATION '/delta/events'
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/8c2fb0b6-a36b-4349-87da-ddba2f4ad9adn%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/e58259c6-8a24-4ed5-a9bb-26e955e631f5n%40googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/63b71bf7-d4ec-4a65-be35-1e8a94b2e8a2n%40googlegroups.com.
val spark = SparkSession.builder()
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName("Delta lake example")
.enableHiveSupport()
.getOrCreate()