We have a CDAP Spark program (it is Spark Streaming, not batch) to process the data.
Problem:
CDAP Spark Program cannot get the Dataset within Spark Transformation functions.
Use Case Steps:
1. Got records from upper source (Kafka Queue).
2. Extract the value of an identifier named ‘ISIN’. E.g, the value is ‘ABCD001EF’.
3. Now, we need to do ad-hoc query from ‘Instrument’ Dataset to get values of some additional fields (e.g, we need to get the instrument permid for this ISIN).
4. Build Quote and related Identifier / Relationship records.
5. Save the Quote and related records into Dataset.
Based on the Spark programming, all the data processing steps (step2 to step5) will be implemented in Spark Transformation functions which are running on Spark Executors (the servers in yarn cluster).
Implements:
#1. If we can get the dataset within Spark transformation functions, then the pseudo code will look like below (the real code may be a little complex, but the basic logic is the same):
Dstream.map( record => {
val isin = getIsin(record)
val instrumentTable = getDataset(InstrumentDatasetName)
val instrumentId = instrumentTable.get(rowKey) // The rowKey is build up by using the Isin.
val quote = buildQuote(record, instrumentId, isin) // quote includes Quote / Identifier / Relationship fields.
val quoteTable = getDataset(quoteDatasetName)
quoteTable.write(quoteRowKey, quote) // save the quote into dataset. Real code may be a little complex since we may have multiple tables here.
})
Advantages:
1) Performance.
For above sample code, we can see that all the things (ad-hoc query, build record, save record) could be done within Spark transformation functions. So, all the records could be processed in different DStreams running in Spark Executors parallel. (Or we could split the logics into few separate RDD transformation functions if necessary, that will depend on the design, but it doesn’t impact the basic logic).
2) Common API Model.
We can also build a common API model to handle such data processing logic, not only for Spark program, but also for all the other components (tools / services) in the BIMBQM system.
By using a common API function, the spark code may look like:
Dstream.map( record => {
val quote = api.buildQuote(record)
api.save(quote)
})
In above code, the api.buildQuote and api.save functions will implement the logic shown in first code piece. So that the common logic could be shared across the different components using the common API.
#2. If we cannot get dataset within Spark transformation functions, then we have to get create another RDD which is include the Instrument field (called rdd2) when processing each message, and join the RDD2 with the RDD in DStream above.
Sample Code as below:
val isin = Dstream.map( record => { val isin = getIsin(record)}).collect() // The collect() function will transfer the data (ISIN) from Spark Executor to Spark Dirver, they are different servers.
val split = buildSplitQueryConditions(isin) // The object ‘split’ is used by CDAP interface to query the data from dataset and create a new RDD.
val rdd2 = context.fromDataset(InstrumentDatasetName, split) // we got rdd2 who includes the mapping between isin and permid and other Instrument fields for this specific isin.
DStream.foreachRDD( rdd1 => { rdd1.join(rdd2) } ) // join rdd2 with the rdds in original DStream object to make them know the mappings between isin and other fields.
DStream.map( record => {
val quote = buildQuote(record)
})
DStream.foreachRDD(rdd => rdd.map(record => {rowKey, record}).saveAsDataset(quoteDatasetName)) // Here, we have to call RDD.saveAsDataset to store the records into dataset.
Disadvantages:
1) Performance.
To create a joined RDD2, since we have to build a query condition object (the ‘split’ in above sample), we have to transfer the data (isin in sample, maybe more in real product) from Spark Executor servers to Spark Driver server. Compared with Implementation#1, it got worse performance since the data has to be transferred between servers for EACH message.
2) In implement#2, we have to call RDD.saveAsDataset() function to save the records. It is tightly coupled with RDD which is a specific fundamental concept in Spark Programming, we cannot build one common API model for both Spark Program and Non-Spark Program.
Note:
Why we don’t create rdd2 when the spark job startup to avoid the data transfer between servers in above sample ? Two reasons below:
1) The data in dataset (Instrument data in above sample) is being changed dynamically, it is not static data. And, our Spark Streaming job may start and keep running once a week, if we load the data when startup, it may get old status.
2) Another reason is, there are more than 10 millions of records, each row may have many columns, so it may not a good idea to pre-load all of them when startup.
So we need to do the ad-hoc query when processing the data.
Thanks,
Fanchao