from __future__ import print_function
from config import config
import sys
from sparkutils import sparkstuff as s
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StringType,IntegerType, FloatType, TimestampType
from google.cloud import bigquery
def SendToBigQuery(df, batchId):
"""
Below uses standard Spark-BigQuery API to write to the table
Additional transformation logic will be performed here
"""
s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
class MDStreaming:
def __init__(self, spark_session,spark_context):
self.spark = spark_session
self.config = config
def fetch_data(self):
self.sc.setLogLevel("ERROR")
#{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
try:
# construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
.option("
group.id", config['common']['appName']) \
.option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
#streamingDataFrame.printSchema()
"""
"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
"""
result = streamingDataFrame.select( \
col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.ticker").alias("ticker") \
, col("parsed_value.timeissued").alias("timeissued") \
, col("parsed_value.price").alias("price")). \
withColumn("currency", lit(config['MDVariables']['currency'])). \
withColumn("op_type", lit(config['MDVariables']['op_type'])). \
withColumn("op_time", current_timestamp()). \
writeStream. \
outputMode('append'). \
option("truncate", "false"). \
format('console'). \
start()
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)
#foreachBatch(SendToBigQuery). \
#outputMode("update"). \
print(result.status)
print(result.recentProgress)
print(result.lastProgress)
result.awaitTermination()
if __name__ == "__main__":
appName = config['common']['appName']
spark_session = s.spark_session(appName)
spark_session = s.setSparkConfBQ(spark_session)
spark_context = s.sparkcontext()
mdstreaming = MDStreaming(spark_session, spark_context)
streamingDataFrame = mdstreaming.fetch_data()
{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
|rowkey |ticker|timeissued |price |currency|op_type|op_time |
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
|de2945d3-ad3c-4c59-9f95-17c252ddf5b5|SBRY |2021-03-01 20:56:37|228.0 |GBP |1 |2021-03-01 20:57:09.015|
|2afe4b4b-f2c0-4412-b018-51b4ed16c948|TSCO |2021-03-01 20:56:37|405.38|GBP |1 |2021-03-01 20:57:09.015|
|ae95f3a4-ca8d-4ad3-9171-225ebd0e4271|BP |2021-03-01 20:56:37|628.9 |GBP |1 |2021-03-01 20:57:09.015|
|78b6b869-1fae-4c80-a5f0-f13363e15766|ORCL |2021-03-01 20:56:37|22.77 |GBP |1 |2021-03-01 20:57:09.015|
|77e49b8c-4d98-4b56-b5c9-5af78b9d76cf|MKS |2021-03-01 20:56:37|168.1 |GBP |1 |2021-03-01 20:57:09.015|
|fff1870a-9994-4a74-b075-0fd220e2e838|SAP |2021-03-01 20:56:37|59.61 |GBP |1 |2021-03-01 20:57:09.015|
|6e0db8db-794b-4f64-8915-d5bbd8f1827d|MRW |2021-03-01 20:56:37|272.6 |GBP |1 |2021-03-01 20:57:09.015|
|d18dfe10-76c0-470d-9208-fb9e2113c510|IBM |2021-03-01 20:56:37|154.79|GBP |1 |2021-03-01 20:57:09.015|
|0d9a62c1-533c-45da-9e80-9f40447b7138|VOD |2021-03-01 20:56:37|282.6 |GBP |1 |2021-03-01 20:57:09.015|
|5bce871e-516c-4371-9d5a-67fd5fa8002f|MSFT |2021-03-01 20:56:37|47.24 |GBP |1 |2021-03-01 20:57:09.015|
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
So data is picked up Ok with 3.0.1 .
On GCP dataproc with version 3.1.1-RC2 (which I built a cluster few days ago), it runs without error but shows no data. It is stuck in Batch 0!
Let me know if you need any other info.