"Periodically update the Dataset" And "ReadFromDataset in spark job"

114 views
Skip to first unread message

Young Liu

unread,
Dec 16, 2014, 10:58:40 AM12/16/14
to cdap...@googlegroups.com
Hi Folks,
Recently our study team have finished the prototype of "Real-Time-Traffic".I had asked for help before. https://groups.google.com/forum/#!topic/cdap-user/Mj2YebVWyvA
1.App has two datasets :  
(1).one is "BM",a static KeyValueTable ,the key is combination of latitude and longitude. the value is roadId of one road.
(2).another is "RTM",a dynamic Table, the rowKey is roadId, and the columns are gpsPoints belong to it.Just like  {roadId:{point1,point2,......}}

2.App has one Flow containing two flowlets.
(1)one is "parser",input is stream of current gps infos , and output is GpsPoint object(has many fields).
(2)another is "matcher", input is GpsPoint object, and utilizes the "BM" dataset to search which roadId this object belongs to and write the results to "RTM" dataset.

3.App has one spark job.
input is "RTM" dataset, and do some iterative computation, and write the results to dataset "RES".

PS: Indeed, we can just use flow for finishing my job, but in future, we may use complex algorithm to process "RTM" so that the spark is necessary.

Here are some puzzles:

1.The Spark API:
<T> T readFromDataset(String datasetName,Class<?> kClass,Class<?> vClass)
As you can see, dataset "RTM" is a Table, it has not key and value concepts.So,
Does this method only take KeyValueTable as input Dataset? Should i change the type of "RTM",if not ,how can i use this ?(Maybe i can make the "RTM" as
a KeyValueTable , the key is roadId , and the value is List<GpsPoint> , but this is necessary?)

2.periodically update the DateSet.
Dataset "RTM" is dynamic, we must update it at least every 30s.I have read through the documents, but i can't find the effective method to update the whole existed Dataset. This problem is same to the Dataset "RES".(Both need to be update periodically.)
The solutions i had thought about are:(1)using the @tick flowlet to delete the content of "RTM"(we don't need the info before) . (2)using javaclient to truncate the "RTM"
In a word:we don't need the "RTM" continuously increase.More exactly ,we just want to use a time window to restrict the input of spark job. How can i effectively achieve this goal?

3.how can i schedule the spark job?
Assume that the Dateset "RTM" is updated every 30s, how can i start spark job every 30s? the workflow do not support spark job now.

Here are some questions about performance.
1.  we expect the Dateset "BM" can be stored in-memory (277MB), I had found there is a interface in javadoc  "public interface MemoryTable extends Table"
But i can't find any implements,do i miss something?

2.In this use case . If we add the @batch and @HashPartion in the "matcher" flowlet and make many instances of this flowlet , will the performance improve ?

Cheers,
-Young

Rohit Sinha

unread,
Dec 16, 2014, 3:09:10 PM12/16/14
to cdap...@googlegroups.com
Hello Young,
Congrats for finishing your first prototype of  "Real-Time-Traffic". We look forward to seeing it in action and will do our best to help you through the implementation process. Please find some answer to your Spark related questions below: 

1. The Spark API:

<T> T readFromDataset(String datasetName,Class<?> kClass,Class<?> vClass)

Its BatchReadable<byte[], Row> so inherently it does has a key. If you just provide byte[] as the key class and Row as the value class you should be able to read it and create an RDD out of it where the values (Row) is which you are interested in and you can process it however you want.

3.how can i schedule the spark job?

We working to add Spark in our workflow. I have completed the implementation and Spark support in Workflow will be released in our 2.7.0 release. You can track the details of the issue here: https://issues.cask.co/browse/CDAP-465
I will have a PR out in sometime which will be merged to develop after review. If you want to try it out you can use our under development 2.7.0 release from the develop branch of CDAP repo on Github. 

Young Liu

unread,
Dec 16, 2014, 8:34:48 PM12/16/14
to cdap...@googlegroups.com
Hi Rohit,
Thanks for clearly answering the Spark related questions. I know how to use this method now and i need to try under development 2.7.0 release.

Cheers,
-Young

在 2014年12月17日星期三UTC+8上午4时09分10秒,Rohit Sinha写道:

Alex Baranau

unread,
Dec 16, 2014, 9:24:32 PM12/16/14
to Young Liu, cdap...@googlegroups.com
Re 2nd question:

I think there are two things in play:
1) limiting how long do you need or want to retain the data in RTM dataset. To limit that, you can specify "dataset.table.ttl" property for that table-based dataset [1].
2) selecting the input for the Spark job (e.g. last 30sec worth of data). We have this ability for mapreduce jobs, but not yet for Spark. It is in works now: please watch https://issues.cask.co/browse/CDAP-1042!

I am not sure what do you mean by update the whole existing dataset. Do you mean drop the whole data and start writing again? What triggers the drop of the data if you are continuously processing data from stream as it arrives?

May be you are considering drop dataset, start writing again as a workaround for specifying selection for Spark program input? I believe specifying selection is a lot safer.

Young, these questions of yours are truly great and I would like to encourage you to ask more of them: they really help us to improve functionality around iterative processing with Spark on CDAP. Please bear with us thru this process!

Thank you,
Sasha

[1] From the javadoc:

From 

  /**
   * Property set to configure time-to-live on data within this dataset. The value given is in milliseconds.
   * Once a cell's data has surpassed the given value in age,
   * the cell's data will no longer be visible and may be garbage collected.
   */




--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/beeed8c5-9b4f-45ad-a174-4f84a950d8c6%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Alex Baranau

unread,
Dec 16, 2014, 9:35:42 PM12/16/14
to Young Liu, cdap...@googlegroups.com
On performance questions:

1. MemoryTable is a special type of dataset which data is kept only in memory. You can think of it as having a HashMap field, but which updates are transactional. E.g. if you use it in Flowlet, if you put smth in it and transaction fails (processing event) then this change will be rolled back and the data in MemoryTable will not change.

I guess, this is not what you need, but rather you need an in-memory cache of the *whole* persisted dataset. I guess you need it in Flow. Currently to do that, you can read the whole data in initialize method in the Flowlet into private field and use it when processing data. Will that work for your case?

2. As always it depends on your data. I'd assume it will improve a lot if you use batching, as this would mean more effective usage of transactions. When increasing number of flowlets instances, which update same dataset it makes sense to use hashpartitioning. There's a great intro guide on this, that you can find at http://cask.co/guides/. Look at "Realtime Data Processing with a Flow" https://github.com/cdap-guides/cdap-flow-guide specifically.

Hope this helps!
Sasha

Young Liu

unread,
Dec 17, 2014, 8:40:21 PM12/17/14
to cdap...@googlegroups.com, lylr...@gmail.com
Hi alex,
Thanks for your great answer,yes,i considering drop dataset, start writing again as a workaround for specifying selection for Spark program input,but now i will take your suggestion,specifying selection for spark program input.
Which type of Table has this property(can be selected by time range ,May be I think the CounterTimeseriesTable is ok.). 
But the spark job seems can't  make full use of CounterTimeseriesTable.
I expect that if spark could select dataset's content by time range.

I have't found the "dataset.table.ttl" property in the javadoc http://docs.cask.co/cdap/current/en/reference-manual/javadocs/index.html#javadocs
Can you give me the URL?

Cheers,
-Young
在 2014年12月17日星期三UTC+8上午10时24分32秒,alex写道:

Young Liu

unread,
Dec 17, 2014, 8:52:15 PM12/17/14
to cdap...@googlegroups.com, lylr...@gmail.com
Hi alex,
The dataset "BM" will not be changed in the app.It was used for searching.
For example. The input gpsPoint's longitude and lattitude is (45.231245,32.15312),the key of "BM" is the combination of lon and lat.
So i can directly map the (45.231245,32.15312) to the roadId,then i add this point to the row of this roadId.
In this use case , i just want the "BM" can be stored in-memory permanent for eliminating the I/O overhead.
Should I just use the MemoryTable , or read the whole data in initialize?

https://github.com/cdap-guides/cdap-flow-guide  is a great example , thank you.

Cheers,
-Young

在 2014年12月17日星期三UTC+8上午10时35分42秒,alex写道:

Alex Baranau

unread,
Dec 17, 2014, 9:02:51 PM12/17/14
to Young Liu, cdap...@googlegroups.com
read the whole data in initialize

yes. Seems like it matches your case.

Sasha

Alex Baranau

unread,
Dec 17, 2014, 9:03:33 PM12/17/14
to Young Liu, cdap...@googlegroups.com

Alex Baranau

unread,
Dec 17, 2014, 9:05:15 PM12/17/14
to Young Liu, cdap...@googlegroups.com
> Which type of Table has this property(can be selected by time range)

None at this point, :(. Watch for: https://issues.cask.co/browse/CDAP-1042.

Thank you,
Sasha

Young Liu

unread,
Dec 18, 2014, 4:18:20 AM12/18/14
to cdap...@googlegroups.com
Hi Rohit,
Why all the examples in documents use the ObjectStore and use the String.class,Double.class?
where can i find complex example with using different  Datasets and using custom java object .(including serialization and disserialization)

Cheers,
-Young

在 2014年12月17日星期三UTC+8上午4时09分10秒,Rohit Sinha写道:
Hello Young,

Ali Anwar

unread,
Dec 18, 2014, 5:13:26 AM12/18/14
to Young Liu, cdap...@googlegroups.com
Hi Young.

If ObjectStore doesn't satisfy your use case, take a look at KeyValueTable. Using that, you should be able to serialize/deserialize the object in any way you want. For instance, you can encode your custom object in JSON, and store it in the value of the KeyValueTable, and simply decode the JSON value during deserialization.
If that's not clear, let me know.

Thanks,
Ali



--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.

Young Liu

unread,
Dec 18, 2014, 8:57:55 AM12/18/14
to cdap...@googlegroups.com
Hi Rohit,
I have tried your suggestion as below:

37     val linesDataset: NewHadoopRDD[Array[Byte], Row] = sc.readFromDataset("RTM", classOf[Array[Byte]], classOf[Row])
38     val lines = linesDataset.values

I can successfully compile and deploy the app, but i can't run it.

the errors are:

2014-12-18 21:42:10,077 - ERROR [Executor task launch worker-0:o.a.s.Logging$class@96] - Exception in task 0.0 in stage 0.0 (TID 0)

java.io.NotSerializableException: co.cask.cdap.api.dataset.table.Result

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_71]

at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) ~[na:1.7.0_71]

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) ~[na:1.7.0_71]

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_71]

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]

at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] 

As you can see , the Result is the implement of Row , but in javadoc "public class Result extends Object implements Row" the Result don't implement the serializable interface.

Now how can i solve this problem?
Should I give up using Table? Should i only use ObjectStore?

Cheers,
-Young

在 2014年12月17日星期三UTC+8上午4时09分10秒,Rohit Sinha写道:
Hello Young,

Young Liu

unread,
Dec 18, 2014, 9:00:51 AM12/18/14
to cdap...@googlegroups.com, lylr...@gmail.com
Hi Ali,
I have tried Rohit's suggestion as below:

37     val linesDataset: NewHadoopRDD[Array[Byte], Row] = sc.readFromDataset("RTM", classOf[Array[Byte]], classOf[Row])
38     val lines = linesDataset.values

I can successfully compile and deploy the app, but i can't run it.

the errors are:

2014-12-18 21:42:10,077 - ERROR [Executor task launch worker-0:o.a.s.Logging$class@96] - Exception in task 0.0 in stage 0.0 (TID 0)

java.io.NotSerializableException: co.cask.cdap.api.dataset.table.Result

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_71]

at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) ~[na:1.7.0_71]

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) ~[na:1.7.0_71]

at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_71]

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) ~[org.apache.spark.spark-core_2.10-1.1.0.jar:1.1.0]

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]

at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] 

As you can see , the Result is the implement of Row , but in javadoc "public class Result extends Object implements Row" the Result don't implement the serializable interface.

Now how can i solve this problem?
Should I give up using Table? Should i only use ObjectStore?
And in fact, I think the ObjectStores can be thought as K/V Table, am i right? 
Cheers,
-Young


在 2014年12月18日星期四UTC+8下午6时13分26秒,ali写道:

Rohit Sinha

unread,
Dec 18, 2014, 4:30:35 PM12/18/14
to Young Liu, cdap...@googlegroups.com
Hello Young,

Our examples are guides to use Spark they don't (and I believe can't) cover all the possible use cases as they are just examples. Generally, we try to keep our example short and ObjectStore allow us to write and read the object back directly without having more lines of code in  Spark programs which converts them into appropriate object from byte[] or something. 

This is where you can help us :) A real world problem opens up  lots of use cases, requirements and bugs and will be really thankful to you if you keep sharing them with us.

As far as using a different dataset type is considered it should be relatively straightforward. From your discussion it looks like you want to use KeyValueDataset. You can use it by including it in your app something like this:


and then if you want to read/write to it from your spark example you do it in the same way as you do for others but just provide the key and value class as byte[]

Hope this helps and do let us know if you face any other difficulty in building your first CDAP app.

Thanks. 

On Thu, Dec 18, 2014 at 1:18 AM, Young Liu <lylr...@gmail.com> wrote:

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.


--
Rohit Sinha
Software Engineer
rsi...@cask.co
+1 612 735 5213

Young Liu

unread,
Dec 19, 2014, 12:51:57 AM12/19/14
to cdap...@googlegroups.com, lylr...@gmail.com
Hi Rohit,
I'm very appreciate for your patient answer.
All the answer can help me a step further.
I will make full use of these resources.
In fact,I want to use Table instead of KeyValueTable in the spark Job.
I have passed the byte[] as keyclass, Row as valueclass. But the Row can't be serialized .

Last, every answer for my questions is very useful . Please bear with me with so many questions. :D.

Cheers,
-Young 

在 2014年12月19日星期五UTC+8上午5时30分35秒,Rohit Sinha写道:
Reply all
Reply to author
Forward
0 new messages