Streaming from a Hive Managed Delta Table

819 views
Skip to first unread message

Ed P

unread,
Jun 10, 2021, 12:32:57 AM6/10/21
to Delta Lake Users and Developers
I am sure this is a configuration issue, just not sure what I need to set correctly.
Testing to date: 
1. Kafka -> parquet table on hdfs = success
2. Kafka -> Delta Table on hdfs = success
3. Kafka -> Delta table in hive = success (managed table)
4.. Steam Hive managed table -> stream to new Hive managed table = Error
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table schema is not set.  Write data into it or use CREATE TABLE to set the schema.;

In these test cases, I parameterized the type of storage(parquet or delta) and the storage location (hdfs or hive database.table) for code consistency.

The issue is when I stream from a Hive Managed Delta table. 
What is happening here? How is it that I can write to a delta table and then when I stream from the same table (configuration untouched), I get an exception? 



val spark = SparkSession.builder()
.appName("Syslog Transformation")
.config("spark.sql.warehouse.directory", s"${mapConfig("warehouseLocation")}")
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()





Jacek Laskowski

unread,
Jun 11, 2021, 5:42:02 AM6/11/21
to Ed P, Delta Lake Users and Developers
Hi,

Can you EXPLAIN this failing query for its query plan? Can you show the whole stacktrace? What are the versions of Spark and Delta?

--
You received this message because you are subscribed to the Google Groups "Delta Lake Users and Developers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to delta-users...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/delta-users/9919b057-ded0-48b5-93fa-d7ae32cbdef0n%40googlegroups.com.

Ed P

unread,
Jun 12, 2021, 3:47:40 AM6/12/21
to Delta Lake Users and Developers
Hi Jacek,

I was able to save the delta tables to HDFS and everything works fine.... using the "save" vs "saveAsTable" 

It is when I save a delta table into Hive and then try to stream data from it that I get the error. No change to my code.

So I am thinking it has something to do (maybe not supported) with streaming data from a managed hive table? BTW, I have not installed Apache Hive, just using the Hive support in Spark.

I am OK with managing the Delta tables externally from Hive since a simple "create" statement creates a hive table for my SQL tool so I can query the data. I did notice that the delta table information (fields) does not show in my SQL tool (parquet files do show the field metadata) and I have a feeling that is also why I may have issues getting PowerBI to read the tables when connecting to the thrift server

I have been looking at your website to learn more about Hive. If you have specific areas where I should study, I would appreciate the guidance.

Ed

Mich Talebzadeh

unread,
Jun 12, 2021, 5:09:36 AM6/12/21
to Ed P, Delta Lake Users and Developers
Hi,

Your points


I was able to save the delta tables to HDFS and everything works fine.... using the "save" vs "saveAsTable" 

ok that is pretty easy writing to hdfs directory

>>> spark = configure_spark_with_delta_pip(builder).getOrCreate()
>>> data = spark.range(0, 5)
>>> data.write.format("delta").save("/tmp/delta-table")


Your other point

It is when I save a delta table into Hive and then try to stream data from it that I get the error. No change to my code.

So I am thinking it has something to do (maybe not supported) with streaming data from a managed hive table? BTW, I have not installed Apache Hive, just using the Hive support in Spark.

OK if you have not installed Hive on top of Hadoop How do you expect this to work? Again I am talking about Hive not hdfs

streaming on hdfs works fine

>>> streamingDf = spark.readStream.format("rate").load()
>>> stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

-------------------------------------------
Batch: 150
-------------------------------------------
+---+
| id|
+---+
|258|
+---+


For Hive support check this doc https://github.com/delta-io/connectors 


under hive connectors


I have not configured the serialiser of delta for Hive yet so I get warning


>>> sqlText = """CREATE TABLE test.events (
...   date DATE,
...   eventId STRING,
...   eventType STRING,
...   data STRING)
... USING DELTA
... PARTITIONED BY (date)
... """
>>> spark.sql(sqlText)
2021-06-12 09:32:12,372 WARN hive.HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `test`.`events` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.


In Hive thrift server I do


0: jdbc:hive2://rhes75:10099/default> use test
. . . . . . . . . . . . . . . . . . > No rows affected (0.012 seconds)
0: jdbc:hive2://rhes75:10099/default> desc events
. . . . . . . . . . . . . . . . . . > +-----------+----------------+--------------------+
| col_name  |   data_type    |      comment       |
+-----------+----------------+--------------------+
| col       | array<string>  | from deserializer  |
+-----------+----------------+--------------------+
1 row selected (0.318 seconds)


so you need Hive to be there and some more work.


HTH




   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



Ed P

unread,
Jun 13, 2021, 4:25:34 AM6/13/21
to Delta Lake Users and Developers
I found some info on github,  I think this is the issue or related to it. Looks like I need to install Apache Hive to get the metadata correctly and connect to PowerBI using Thrift server

Currently thriftserver's SparkGetColumnsOperation uses old SessionCatalog object, which relies on hive metastore. Instead, TableCatalog should be used, which will delegate to correct catalog in case of different table providers (delta provider in your case).
https://github.com/apache/spark/blob/e958833c727442fc9efa4fc92f93db16cd5c8476/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala#L57

Mich Talebzadeh

unread,
Jun 13, 2021, 2:54:15 PM6/13/21
to Ed P, Delta Lake Users and Developers
Hi Ed,

you may be interested in this thread 

Creating Hive external table with STORED BY 'io.delta.hive.DeltaStorageHandler' throws error

HTH
Reply all
Reply to author
Forward
0 new messages