How to save RDD which includes multiple columns in CDAP Spark program

366 views
Skip to first unread message

fancha...@gmail.com

unread,
Sep 25, 2016, 9:53:59 PM9/25/16
to CDAP User
Hi,

I have successfully stored the key-value row by using saveAsDataset in my CDAP Spark job. However, may I ask how to save a row who has multiple columns ?

I found a sample https://github.com/caskdata/cdap/blob/9beb5dbe5b8a7d00c243c3a57972d72f2219ce84/cdap-unit-test/src/test/scala/co/cask/cdap/spark/app/ScalaStreamFormatSpecSpark.scala
However, it needs to pre-define the dataset schema via an data class (it is 'Person' in this sample).

In our project, the schema is passed to the program in avro message, so I don't want to pre-define the schema when creating the dataset. I want to make it more flexible that I can save any columns received from avro message. I think there are few ways to meet this:
1.  It would be perfect if I can create the Put object in RDD to include the columns and values (not just key-value pairs), then use the saveAsDataset to store the row (Put object in RDD). Does CDAP provide such capability ? Any sample ?
2. Alternatively, I have tried to get the dataset via getDataset in spark program, and then call its put function to save the Put object. However, it requires to collect the data from Spark executor to Spark driver (Sample: https://github.com/caskdata/cdap/blob/9beb5dbe5b8a7d00c243c3a57972d72f2219ce84/cdap-unit-test/src/test/scala/co/cask/cdap/spark/app/ScalaSparkLogParser.scala), this got bad performance since it requires to transfer the data between servers. Is there any way to get dataset in Spark executor (in RDD.map() directly, not by
DatasetContext.getDataset in Spark driver) so that I can use table.put() function to save the Put object ?
3. Or any other ways to do this ?


Thanks,
Fanchao

fancha...@gmail.com

unread,
Sep 26, 2016, 2:34:50 AM9/26/16
to CDAP User, fancha...@gmail.com
This question got resolved. Use RDD.saveAsDataset(byte[] rowkey, Put) to save the row into dataset (Table type of dataset).


Thanks,
Fanchao

fancha...@gmail.com

unread,
Sep 27, 2016, 3:44:20 AM9/27/16
to CDAP User, fancha...@gmail.com
 I want to reopen this topic to discuss how to use dataset in CDAP Spark Program.

Reason to reopen topic:
In my last reply, this topic is resolved by using RDD.saveAsDataset(key, Put) function. However this approach bind the RDD and dataset together tightly. It is hard to split the program into different modules.

Topic:
Is there any way to get dataset in Spark executor (within RDD.map() function) ? So that we can write the data into dataset by calling dataset table object directly.
As I mentioned in the #2 in the topic, I found a sample to get the dataset by datasetContext.getDataset(). https://github.com/caskdata/cdap/blob/9beb5dbe5b8a7d00c243c3a57972d72f2219ce84/cdap-unit-test/src/test/scala/co/cask/cdap/spark/app/ScalaSparkLogParser.scala. However, this can only be done in Spark Driver process, that means we have to collect the data from RDD to Spark driver, then save the data. We cannot get the dataset within Spark executor (within RDD.map()), because the datasetContext is not serializable. Whereas, in my testing, if I got the dataset table object in Spark driver process and tried to pass it to RDD, it reports the exception that the table object is not serializable either. So, except calling the RDD.saveAsDataset, may I ask is there any way to get the dataset object in Spark executor (within RDD.map() function) and save the data ?


Thanks,
Fanchao

vin...@cask.co

unread,
Sep 27, 2016, 4:48:02 PM9/27/16
to CDAP User, fancha...@gmail.com
Hi Fanchao,

Datasets in spark programs can be accessed using datasetcontext.getDataset(). However, the dataset acquired through getDataset() cannot be used through RDD.map(). We have that feature coming in our roadmap.

Please let us know if we answered your questions.

Thanks,
Vinisha

Terence Yim

unread,
Sep 28, 2016, 10:17:22 AM9/28/16
to cdap...@googlegroups.com, fancha...@gmail.com
Hi Fanchao,

May I ask why you want to write to dataset directly from executor instead of using RDD.saveAsDataset()? Using saveAsDataset fits the Spark execution model better (functional, no side effect from function evaluation) and the performance is also much better.

Terence

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/2b415633-b491-4eab-a94a-d5c9349973b1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

fancha...@gmail.com

unread,
Sep 28, 2016, 9:56:04 PM9/28/16
to CDAP User, fancha...@gmail.com
Hi Terence,

We want to get and use the dataset directly for below reasons:
1. We want to separate the data processing module and data store module. That will make the system layers more clear. So, in CDAP spark program, we don't want to call RDD.saveAsDataset(), it binds the data processing logic and data store logic together.
2. Based on the first reason above, we will implement a common data store API module (let's call it API), it should be shared by all the other components in the whole system, not only for CDAP spark program. For this purpose, the API should be designed to accept and store the data object directly, not limited in spark specific programming characteristic (such as: RDD). For this reason, if we pass the RDD to API and it call RDD.saveAsDataset, although we separate the data processing and API modules, but this API is for spark specific using, not a common module shared by whole system.
3. Our data model is a little more complex, one input record might be resulted to few different output data objects and there might be relationships between them, sometimes, we may need to access the other datasets to read the data when processing the inputs. However, by using RDD.saveAsDataset, we can just build one kind of output data objects in RDD and save it into one dataset, and we cannot get data from dataset within RDD based on the input data (supposed that we received some kind of data, and we need to get data from other dataset when we processing such kind of data in RDD). Inversely, if we could get the dataset table object within RDD.map(), then we could control how to build a set of output objects together from each record in RDD and store them, this way could give us fine-grained of the control for the data.

For above reasons, we want to get the dataset in spark executor (it means that we could call something like within RDD.map()) to save the data objects, so that we can design a common API for data store layer, not only for spark program, but also for all the other components (non-spark) in the whole system, and it will give us more capabilities to control the data.

BTW, may I ask is there a timeframe for when the feature will be added to CDAP ?


Thanks,
Fanchao

Terence Yim

unread,
Oct 2, 2016, 7:38:01 PM10/2/16
to cdap...@googlegroups.com, fancha...@gmail.com
Hi Fanchao,

Let me explain to you why writing to dataset (or any data store) directly from Spark closure function is not a good idea.

1. Spark follows a functional programming paradigm such that it assumes functions operate on RDD has no side effect. Spark relies on this for failure recovery. E.g. if a node failed, it can simply recompute the RDD partitions reside on that node based on the RDD lineage by invoking all transformation functions between the base RDD and the target RDD. However, if there is any side effect operations in those transformation functions, the computation could be off. E.g if you perform write-after-read type of operation, the value written out can be wrong and all the subsequence usage of the RDD could produce wrong result as well.

2. Spark won’t invoke any transformation until there is an action to trigger it (http://spark.apache.org/docs/latest/programming-guide.html#actions). The RDD.saveAsDataset provided by CDAP is also an action. As you can see all the actions either involve collecting some results back to the driver or saving the entire RDD to some storage. With your proposal, you’ll always need to invoke an action that guarantees it will go through all objects in the RDD, which essentially will perform worse than using any of the RDD.saveAsXXX methods. E.g. you cannot involve the RDD.first() action because it doesn’t require going through all objects in the RDD (depending on the implementation).

3. For your third point, it’ll be good if you can give me a more concrete example. For the time being I think it can be done by having some base RDD and transform it to different RDD, some for saving to dataset, some for further usage.

Terence


fancha...@gmail.com

unread,
Oct 6, 2016, 1:12:22 AM10/6/16
to CDAP User, fancha...@gmail.com
Hi Terence,

Thanks for explanation.

Please see a concrete example, as below:
We need to query data from some datasets when processing the input data. E.g., we need to get an unique id (some kind of pre-defined internal unique id, just like a dictionary) when we receiving some code from input record. We don't want to pre-load all such kind of mappings before processing the input data since it is huge. Instead, we want to load the required data based on the inputs. If we cannot get the table dataset object within RDD transformation function, we cannot query the data from dataset. Is there any way to solve it in CDAP Spark program ?


Regards,
Fanchao

Terence Yim

unread,
Oct 6, 2016, 2:14:16 AM10/6/16
to cdap...@googlegroups.com, fancha...@gmail.com
Hi,

That's something like a lookup use case, right? For adding support for read only dataset is ok. Do you have any cases that actually need to write from transform function?

Terence

Sent from my iPhone

Terence Yim

unread,
Oct 6, 2016, 2:16:55 AM10/6/16
to cdap...@googlegroups.com, fancha...@gmail.com
To clarify what I meant by read only is ok is, if the read dataset is immutable along the course of the spark program execution, then it's ok. This is because if there is failure that trigger RDD partition recomputation, the value will get read again.

Terence

Sent from my iPhone

Terence Yim

unread,
Oct 6, 2016, 2:24:00 AM10/6/16
to cdap...@googlegroups.com, fancha...@gmail.com
Also, the idiomatic way of doing that in Spark is actually using the join function. As I explained earlier, creating a RDD won't necessarily load all the data. It will only create the partitions that are needed for the completion of an action. For example, if you create a lookup id RDD and join it against the input, only those partitions containing the ids in the input data will get loaded. Depending on how the partitioning looks like and how sparse the data is, it may load much less that the whole table. Also a good thing about using RDD join is, you can reuse the lookup RDD since it becomes persisted in the spark framework memory.

Terence

Sent from my iPhone

zanso...@gmail.com

unread,
Oct 7, 2016, 3:50:25 AM10/7/16
to CDAP User, fancha...@gmail.com
Terence,

We are still looking forward to the new feature of getDatSet() from RDD.map(), the reason is we have different path to update HBase either from CDAP Spark or CDAP DataSet API, but we want to build a consistant API so that updates from both spark and dataset are always consistant without maintaining 2 sets of programs.
for example,
API.doSomething(){
    DataSet ds = getDataSet("A");
    //build data object a
    ds.put(objectA);
    ds = getDataSet("B");
    //build data object b, maybe do some joins also.
    ds.put(objectB)
     ....
}

Using the workaround you provides, we have to do something like

CDAP Spark,
rddMapping = loadSomeRDDMapping4Join()
rddA = RDD.map()
rddB = RDD.map().join(rddMapping)
rddA.saveAsDataSet()
rddB.saveAsDataSet()
....

CDAP (No Spark)
API.doSomething() //refer to above sample

Without this feature, we have to maintain 2 set up API, it works but more works and hard to mainitain with the growth of the system.

eaganm...@gmail.com

unread,
Oct 7, 2016, 8:07:13 AM10/7/16
to CDAP User, fancha...@gmail.com
this sounds like a good subject during training this upcoming week.  I will make sure Russ and David are aware

Terence Yim

unread,
Oct 7, 2016, 12:26:45 PM10/7/16
to CDAP User, fancha...@gmail.com, zanso...@gmail.com
Hi,

I understand from the code maintenance point of view. However, as I pointed out earlier, Spark has a fundamentally different data and computation model then imperative computation (e.g. in your API.doSomething() example), which I afraid one library can't fit both. With the usage of CDAP dataset, however, it does help a bit. When using CDAP dataset in Spark for either reading or writing, it uses the BatchReadable/BatchWritable interfaces, which the dataset class itself implements. This means, you can reuse the same set of data logic in the dataset (through some private method in the dataset class) for both point access as well as batch access.

Terence

zanso...@gmail.com

unread,
Oct 9, 2016, 11:10:50 PM10/9/16
to CDAP User, fancha...@gmail.com, zanso...@gmail.com
Hello,

Inside API.doSomething(), we'd like to archieve an Adapter design pattern, or directly pass dataset or dataset name into it, so that the API will not depend on either Spark or DataSet.  that's why we want to get dataset in Spark and also Spark executer should be able to support get dataset, but anyway we can have some discussions in the training this week.

fancha...@gmail.com

unread,
Oct 11, 2016, 3:13:17 AM10/11/16
to CDAP User, fancha...@gmail.com, zanso...@gmail.com
Hi,

I would like to give a detailed case to help our discussions.

Case:
We want to use CDAP Spark Program building block to build up a pipeline to process part of our data. When we received one record from upper stream, we will format and extract some required fields from the raw data. This is done in a serials of RDD transformation functions. Then, we need to do ad-hoc query to get some additional fields from dataset (HBase Table Based Dataset), for example, we got an Identifier 'US001ASE78', then we will do ad-hoc query to get a related internal unique ID and other fields related to this Identifier from another dataset (called 'datasetA'). Then, after some other RDD transformation operations, finally we build the object to be saved into our Core Store (not the datasetA).

In above case, generally speaking, if we cannot get the dataset within RDD transformation functions, we cannot do ad-hoc query from dataset within RDD transformation functions, that's a big limitation for us. For this case, we cannot even pre-load the data from datasetA into another RDD and use join operation. Because, the data in datasetA is not a static data, it keeps growing. So, we have to execute ad-hoc query when we processing the data in our pipeline in RDD transformation functions.

Another reason we need to get dataset within RDD is, as Zansong mentioned, we want to build up a consistent module for data layer, not separate APIs for Spark and Non-Spark programs in our system.


Thanks,
Fanchao

fancha...@gmail.com

unread,
Oct 16, 2016, 11:08:43 PM10/16/16
to CDAP User, fancha...@gmail.com, zanso...@gmail.com
We have a CDAP Spark program (it is Spark Streaming, not batch) to process the data.

Problem:
CDAP Spark Program cannot get the Dataset within Spark Transformation functions.

Use Case Steps:
1.    Got records from upper source (Kafka Queue).
2.    Extract the value of an identifier named ‘ISIN’. E.g, the value is ‘ABCD001EF’.
3.    Now, we need to do ad-hoc query from ‘Instrument’ Dataset to get values of some additional fields (e.g, we need to get the instrument permid for this ISIN).
4.    Build Quote and related Identifier / Relationship records.
5.    Save the Quote and related records into Dataset.
Based on the Spark programming, all the data processing steps (step2 to step5) will be implemented in Spark Transformation functions which are running on Spark Executors (the servers in yarn cluster).

Implements:
#1.    If we can get the dataset within Spark transformation functions, then the pseudo code will look like below (the real code may be a little complex, but the basic logic is the same):

    Dstream.map( record => {
        val isin = getIsin(record)
        val instrumentTable = getDataset(InstrumentDatasetName)
        val instrumentId = instrumentTable.get(rowKey)    // The rowKey is build up by using the Isin.
        val quote = buildQuote(record, instrumentId, isin)    // quote includes Quote / Identifier / Relationship fields.
        val quoteTable = getDataset(quoteDatasetName)
        quoteTable.write(quoteRowKey, quote)    // save the quote into dataset. Real code may be a little complex since we may have multiple tables here.
    })

    Advantages:
    1)    Performance.
    For above sample code, we can see that all the things (ad-hoc query, build record, save record) could be done within Spark transformation functions. So, all the records could be processed in different DStreams running in Spark Executors parallel. (Or we could split the logics into few separate RDD transformation functions if necessary, that will depend on the design, but it doesn’t impact the basic logic).
    2)    Common API Model.
    We can also build a common API model to handle such data processing logic, not only for Spark program, but also for all the other components (tools / services) in the BIMBQM system.
    By using a common API function, the spark code may look like:
        Dstream.map( record => {
            val quote = api.buildQuote(record)
            api.save(quote)
        })
    In above code, the api.buildQuote and api.save functions will implement the logic shown in first code piece. So that the common logic could be shared across the different components using the common API.

#2.    If we cannot get dataset within Spark transformation functions, then we have to get create another RDD which is include the Instrument field (called rdd2) when processing each message, and join the RDD2 with the RDD in DStream above.
    Sample Code as below:
        val isin = Dstream.map( record => { val isin = getIsin(record)}).collect()    // The collect() function will transfer the data (ISIN) from Spark Executor to Spark Dirver, they are different servers.
        val split = buildSplitQueryConditions(isin)    // The object ‘split’ is used by CDAP interface to query the data from dataset and create a new RDD.
        val rdd2 = context.fromDataset(InstrumentDatasetName, split)    // we got rdd2 who includes the mapping between isin and permid and other Instrument fields for this specific isin.
        DStream.foreachRDD( rdd1 => { rdd1.join(rdd2) } )    // join rdd2 with the rdds in original DStream object to make them know the mappings between isin and other fields.
        DStream.map( record => {
        val quote = buildQuote(record)
    })
    DStream.foreachRDD(rdd => rdd.map(record => {rowKey, record}).saveAsDataset(quoteDatasetName))    // Here, we have to call RDD.saveAsDataset to store the records into dataset.

    Disadvantages:
    1)    Performance.
        To create a joined RDD2, since we have to build a query condition object (the ‘split’ in above sample), we have to transfer the data (isin in sample, maybe more in real product) from Spark Executor servers to Spark Driver server. Compared with Implementation#1, it got worse performance since the data has to be transferred between servers for EACH message.
    2)    In implement#2, we have to call RDD.saveAsDataset() function to save the records. It is tightly coupled with RDD which is a specific fundamental concept in Spark Programming, we cannot build one common API model for both Spark Program and Non-Spark Program.

    Note:
    Why we don’t create rdd2 when the spark job startup to avoid the data transfer between servers in above sample ? Two reasons below:
    1)    The data in dataset (Instrument data in above sample) is being changed dynamically, it is not static data. And, our Spark Streaming job may start and keep running once a week, if we load the data when startup, it may get old status.
    2)    Another reason is, there are more than 10 millions of records, each row may have many columns, so it may not a good idea to pre-load all of them when startup.
    So we need to do the ad-hoc query when processing the data.


Thanks,
Fanchao

Terence Yim

unread,
Oct 17, 2016, 1:28:24 AM10/17/16
to cdap...@googlegroups.com, fancha...@gmail.com, zanso...@gmail.com
Hi Fanchao,

One quick question about your #1 proposal (I’ll address the rest slightly later). Is the map() operation the only operation on the DStream? Have you try running a Spark Streaming with only a map() operation? Because I am pretty sure it won’t trigger any actual data processing since Spark in general (including Spark streaming) requires an action on RDD to trigger any data processing. A map function on a DStream is actually translated into a sequence of RDD.map for each RDD generated by the Spark Stream framework (based on the batch interval).

Terence


--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages