There seems to be an issue with the latest Dataproc cluster using Spark 3.1.1 release candidate

530 views
Skip to first unread message

mich.ta...@gmail.com

unread,
Mar 1, 2021, 1:41:31 PM3/1/21
to Google Cloud Dataproc Discussions
Hi,

I am using the latest build of Dataproc 2.0.x release versions that uses Apache spark 3.1.1-RC 2 (release candidate).

 

In short a spark structured streaming job runs OK on on-premise on Spark version version 3.0.1 and shows streaming data but it does not work on Dataproc with Spark 3.1.1

The problem is that Spark 3.1.1 is not available now for download from Spark site as shown in the image.

Ha anyone else encountered such problem by any chance? I brought this one up in spark user group forum as well

Thanks



image (1).png

i...@google.com

unread,
Mar 1, 2021, 3:32:30 PM3/1/21
to Google Cloud Dataproc Discussions
Hello,

For Dataproc we do not download Spark binaries, we build Spark from source code. This specific version of Spark was build from v3.1.1-rc2 GitHub tag with additional Dataproc fixes and features.

mich.ta...@gmail.com

unread,
Mar 1, 2021, 3:36:50 PM3/1/21
to Google Cloud Dataproc Discussions
OK thanks.

Is there any remedy to this issue I observed?

Any work-around?

Thanks

i...@google.com

unread,
Mar 1, 2021, 3:48:51 PM3/1/21
to Google Cloud Dataproc Discussions
May you clarify what is you issue?

mich.ta...@gmail.com

unread,
Mar 1, 2021, 4:04:31 PM3/1/21
to Google Cloud Dataproc Discussions
Yes sure

This I published in Spark user group couple of days ago

I have a PySpark program that uses Spark 3.0.1 on-premise to read Kafka topic and write it to Google BigQuery. This works fine on Premise and loops over micro-batch of data.  

At the moment is it displaying data through Spark structured streaming on console

```
from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType
from google.cloud import bigquery


def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """

    s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

class MDStreaming:
    def __init__(self, spark_session,spark_context):
        self.spark = spark_session
        self.sc = spark_context
        self.config = config

    def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     format('console'). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)

        #foreachBatch(SendToBigQuery). \
        #outputMode("update"). \


        print(result.status)
        print(result.recentProgress)
        print(result.lastProgress)


        result.awaitTermination()

if __name__ == "__main__":
    appName = config['common']['appName']
    spark_session = s.spark_session(appName)
    spark_session = s.setSparkConfBQ(spark_session)
    spark_context = s.sparkcontext()
    mdstreaming = MDStreaming(spark_session, spark_context)
    streamingDataFrame = mdstreaming.fetch_data()
```
This runs OK on-premise on 3.0.1 and shows
```
{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
|rowkey                              |ticker|timeissued         |price |currency|op_type|op_time                |
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
|de2945d3-ad3c-4c59-9f95-17c252ddf5b5|SBRY  |2021-03-01 20:56:37|228.0 |GBP     |1      |2021-03-01 20:57:09.015|
|2afe4b4b-f2c0-4412-b018-51b4ed16c948|TSCO  |2021-03-01 20:56:37|405.38|GBP     |1      |2021-03-01 20:57:09.015|
|ae95f3a4-ca8d-4ad3-9171-225ebd0e4271|BP    |2021-03-01 20:56:37|628.9 |GBP     |1      |2021-03-01 20:57:09.015|
|78b6b869-1fae-4c80-a5f0-f13363e15766|ORCL  |2021-03-01 20:56:37|22.77 |GBP     |1      |2021-03-01 20:57:09.015|
|77e49b8c-4d98-4b56-b5c9-5af78b9d76cf|MKS   |2021-03-01 20:56:37|168.1 |GBP     |1      |2021-03-01 20:57:09.015|
|fff1870a-9994-4a74-b075-0fd220e2e838|SAP   |2021-03-01 20:56:37|59.61 |GBP     |1      |2021-03-01 20:57:09.015|
|6e0db8db-794b-4f64-8915-d5bbd8f1827d|MRW   |2021-03-01 20:56:37|272.6 |GBP     |1      |2021-03-01 20:57:09.015|
|d18dfe10-76c0-470d-9208-fb9e2113c510|IBM   |2021-03-01 20:56:37|154.79|GBP     |1      |2021-03-01 20:57:09.015|
|0d9a62c1-533c-45da-9e80-9f40447b7138|VOD   |2021-03-01 20:56:37|282.6 |GBP     |1      |2021-03-01 20:57:09.015|
|5bce871e-516c-4371-9d5a-67fd5fa8002f|MSFT  |2021-03-01 20:56:37|47.24 |GBP     |1      |2021-03-01 20:57:09.015|
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
```

So data is picked up Ok with 3.0.1 .


On GCP dataproc with version 3.1.1-RC2 (which I built a cluster few days ago), it runs without error but shows no data. It is stuck in Batch 0!
```
{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+

```
Let me know if you need any other info.

Thanks

mich.ta...@gmail.com

unread,
Mar 1, 2021, 4:16:39 PM3/1/21
to Google Cloud Dataproc Discussions
Just to confirm as per streaming doc, one can monitor streaming status with

         result = streamingDataFrame.select( \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     format('console'). \
                     start()

        print(result.status)
        print(result.recentProgress)
        print(result.lastProgress)

Ok so they should tell us something.

When I run it where streaming data is displayed (on-premise) I see below (format('console')

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+------+-------------------+------+--------+-------+----------------------+
|rowkey                              |ticker|timeissued         |price |currency|op_type|op_time               |
+------------------------------------+------+-------------------+------+--------+-------+----------------------+
|e4c02434-fa1f-4e8e-ad94-40c2782e9681|MRW   |2021-03-01 15:11:44|293.75|GBP     |1      |2021-03-01 15:12:16.49|

etc ..
 
On the other hand when I run it in Google Cloud cluster I see exactly the same diagnostics BUT no data!

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+

So the monitoring does not say anything. 

i...@google.com

unread,
Mar 1, 2021, 4:26:15 PM3/1/21
to Google Cloud Dataproc Discussions
May you test your job on Dataproc cluster with `2.0.0-RC22-debian10` image version (last image version with Spark 3.0.1)?

Also, do you read/write from GCS or HDFS on Dataproc cluster? 

mich.ta...@gmail.com

unread,
Mar 1, 2021, 4:46:23 PM3/1/21
to Google Cloud Dataproc Discussions
Hi,

Thanks. However, every cluster cost $$ unless I agree a deal with Google so that I can build a 3.0.1 cluster at no extra cost and test it.

This is spark streaming. The test data is created on master node of Dataproc and written to flat file before picked up by Kafka cluster running on docker (one zookeeper and one Kafka-broker). These work as I have used the same design with the old 2.3.3 spark on previous Dataproc cluster. The difference is that we are now using Python and Spark 3.1.1 on new Dataproc cluster. I can get the data out on the command line

docker run --rm ches/kafka kafka-console-consumer.sh --zookeeper ctpcluster-m:2181, ctpcluster-w-0:2181, ctpcluster-w-1:2181  --from-beginning --topic md

{"rowkey":"351c192b-dfb6-4dcd-abcf-05b88d61d76f","ticker":"BP", "timeissued":"2021-03-01T21:44:41", "price":435.42}
{"rowkey":"aef72715-59af-4886-8574-519f63fc4fa5","ticker":"MSFT", "timeissued":"2021-03-01T21:44:46", "price":28.97}
{"rowkey":"c21c67f4-e6bc-4fc3-ba08-6a69ad9e5306","ticker":"VOD", "timeissued":"2021-03-01T21:44:46", "price":312.98}
{"rowkey":"fad7268e-ccb7-4e40-a639-7fc55a100a62","ticker":"ORCL", "timeissued":"2021-03-01T21:44:46", "price":41.5}
{"rowkey":"1ae57a64-c414-47b8-b02e-6b9f1d3c500a","ticker":"TSCO", "timeissued":"2021-03-01T21:44:46", "price":388.03}
{"rowkey":"171935d8-5c0a-4738-b93b-31ec8165062a","ticker":"BP", "timeissued":"2021-03-01T21:44:46", "price":614.0}

i...@google.com

unread,
Mar 1, 2021, 5:43:50 PM3/1/21
to Google Cloud Dataproc Discussions
May you test Spark 3.1 on-prem then (https://dist.apache.org/repos/dist/dev/spark/v3.1.1-rc2-bin/)? I want to understand if the issue with Dataproc, Spark or GCS/HDFS.

mich.ta...@gmail.com

unread,
Mar 1, 2021, 5:53:38 PM3/1/21
to Google Cloud Dataproc Discussions
 Well I am afraid we do not have the bandwidth and spare boxes to install 3.1.1 on-premise.

I reiterate the same suggestion as before that Google provide credit for me to build and test this on Dataproc  3.0.1 which will benefit Google customers as well.

Let me know your thoughts.

HTH

mich.ta...@gmail.com

unread,
Mar 1, 2021, 6:07:49 PM3/1/21
to Google Cloud Dataproc Discussions
Please also note that I am a Google advantage partner and I would like that we get some resolution and quick fix if this is indeed an spark release matter. 

Google may prefer to directly communicate with me

thx

i...@google.com

unread,
Mar 1, 2021, 7:23:14 PM3/1/21
to Google Cloud Dataproc Discussions
To get GCP credits you need to contact your account manager, not Dataproc team. You can point to this thread as justification when you reach out to them.

mich.ta...@gmail.com

unread,
Mar 2, 2021, 3:55:20 AM3/2/21
to Google Cloud Dataproc Discussions
OK thanks I will action this.

Regards,

Mich

Mich Talebzadeh

unread,
Mar 2, 2021, 8:48:17 AM3/2/21
to Google Cloud Dataproc Discussions
FYI, I have raised a case with Google  Case #27069138  so I can get credit for setting up a 3.0.1 cluster . Appreciate your help

Can you confirm the image to build 3.0.1 cluster as suggested in the email trail is

Dataproc cluster with `2.0.0-RC22-debian10` image  ?

 
Thanks
Reply all
Reply to author
Forward
0 new messages