Spark streaming, sliding window example and explaination

16,923 views
Skip to first unread message

ngoc linh

unread,
Jan 2, 2014, 4:59:23 AM1/2/14
to spark...@googlegroups.com
Dear Matei, Tathagata,

Could you guys provide me an example with explaination sliding window (in Spark streaming). I was confusing about the relationship between batchInterval-windowDuration-slideDuration.

Tathagata Das

unread,
Jan 2, 2014, 5:55:26 AM1/2/14
to spark...@googlegroups.com
Hello, 

"Batch interval" is the basic interval at which the system with receive the data in batches. This is the interval set when creating a StreamingContext. For example, if you set the batch interval as 2 second, then any input DStream will generate RDDs of received data at 2 second intervals. 

A window operator is defined by two parameters - 
- WindowDuration - the length of the window
- SlideDuration - the interval at which the window will slide or move forward
Its a bit hard to explain the sliding of a window in words, so slides may be more useful. Take a look at slides 27 - 29 in the attached slides. 

Both the window duration and the slide duration must be multiples of the batch interval, as received data is divided into batches of duration "batch interval". Lets take an example. Suppose we have a batch interval of 2 seconds and we have defined an input stream.

val inputsStream = ssc.socketStream(...)

This inputStream will generate RDDs every 2 seconds, containing last 2 second of data. Now say we define a few window operation on this. The window operation is defined as DStream.window(<window duration>, <slide duration>)

val windowStream1 = inputStream.window(Seconds(4))
val windowStream2 = inputStream.window(Seconds(4), Seconds(2))
val windowStream3 = inputStream.window(Seconds(10), Seconds(4))
val windowStream4 = inputStream.window(Seconds(10), Seconds(10)
val windowStream5 = inputStream.window(Seconds(2), Seconds(2))    // same as inputStream
val windowStream6 = inputStream.window(Seconds(11), Seconds(2))   // invalid
val windowStream7 = inputStream.window(Seconds(4), Seconds(1))    // invalid


Both, windowStream1 and windowStream2 will generate RDDs containing data from last 4 seconds. And the RDDs will be generated every 2 seconds (if the slide duration is not specified as in windowStream1, then the slide duration was assumed to be inputStream's batch duration = 2 sec). Note that each of these windows of data are overlapping. Window RDD at time 10 will contain data from times 6 to 10 (i.e. slightly after 6 to end of 10), and window RDD at time 12 will contain data from 8 to 12. 

Similarly, windowStream3 will generate RDDs every 4 seconds, each containing data from last 10 seconds. And windowStream4 will generate non-overlapping windows, that is, RDDs every 10 seconds, containing data from last 10 seconds. windowStream5 is essentially same as the inputStream.

windowStream6 and windowStream7 are invalid because one of the two parameters is not a multiple of the batch interval, that is, 2 seconds. This is how the three are related. 

Hope that helped. Note that I did simplify a few details that are important when you want to define window operations over windowed streams. I am ignoring them for now. Feel free to ask more specific questions.

TD











On Thu, Jan 2, 2014 at 1:59 AM, ngoc linh <esvn...@gmail.com> wrote:
Dear Matei, Tathagata,

Could you guys provide me an example with explaination sliding window (in Spark streaming). I was confusing about the relationship between batchInterval-windowDuration-slideDuration.

--
You received this message because you are subscribed to the Google Groups "Spark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to spark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Scala Days 2013 - Spark Streaming.pptx

ngoc linh

unread,
Jan 3, 2014, 3:41:02 AM1/3/14
to spark...@googlegroups.com
Thanks so much for your explaination,
I would like to ask more specifically:


1. About batchInterval:
Given pseudo code block:

inputStream = readStream("http://...", "2s")
//start Block_1
ones = inputStream.map(event => (event.url, 1))
counts = ones.runningReduce((a, b) => a + b)
//end Block_1

So, does that mean the Block_1 runs on every 2 secs (i did think this case = while(true) { execute Block_1; sleep(2s); } )?
And, would the lifecycle of DStream inputStream be like following ?:
@t_0: execute Block_1 with inputStream = [RDD@t_0]
@t_2: execute Block_1 with inputStream = [RDD@t_0, RDD@t_2]
@t_4: execute Block_1 with inputStream = [RDD@t_0, RDD@t_2, RDD@t_4]


2. About window method:
Given pseudo code block:

inputStream = readStream("http://...", "2s")
windowStream = inputStream.window(Seconds(8), Seconds(4))
//start Block_2
ones = windowStream.map(event => (event.url, 1))
counts = ones.runningReduce((a, b) => a + b)
//end Block_2

So, does that mean the Block_2 runs on every 4 secs (i did think this case = while(true) { execute Block_2; sleep(4s); } )?
And, would the lifecycle of DStream windowStream be like following ?:
@t_0 -> @t_7: none window completed, Block_2 does not execute.
@t_8: execute Block_2 with windowStream = [RDD@t_0->8]
@t_12: execute Block_2 with windowStream = [RDD@t_0->8, RDD@t_4->12]
@t_16: execute Block_2 with windowStream = [RDD@t_0->8, RDD@t_4->12, RDD@t_8->16]


3. About the realtime data store
I have read a paper about a realtime datastore named Druid (from Metamarket, if i remember correctly). 
In short, Druid = streaming engine(Kafka + Storm) + zookeeper + realtime nodes + historical nodes (it is said that Druid queries 6T (in-mem) data in 1.4 secs).
Then the question is, if i want to build the realtime data store solution upon Spark platform (or BDAS) then what is the recommended model?


Look forward your help.

Tathagata Das

unread,
Jan 3, 2014, 4:34:21 AM1/3/14
to spark...@googlegroups.com
Responses inline. I hope they help.

On Fri, Jan 3, 2014 at 12:41 AM, ngoc linh <esvn...@gmail.com> wrote:
Thanks so much for your explaination,
I would like to ask more specifically:


1. About batchInterval:
Given pseudo code block:

inputStream = readStream("http://...", "2s")
//start Block_1
ones = inputStream.map(event => (event.url, 1))
counts = ones.runningReduce((a, b) => a + b)
//end Block_1

So, does that mean the Block_1 runs on every 2 secs (i did think this case = while(true) { execute Block_1; sleep(2s); } )?
And, would the lifecycle of DStream inputStream be like following ?:
@t_0: execute Block_1 with inputStream = [RDD@t_0]
@t_2: execute Block_1 with inputStream = [RDD@t_0, RDD@t_2]
@t_4: execute Block_1 with inputStream = [RDD@t_0, RDD@t_2, RDD@t_4]

Not really. The right way to think about this program is as follows. When the line "inputStream = streamingContext.readStream(..)" is executed, it just sets up all the information in the streaming context that when the data will be received, it will be divided into batches and each batch will be represented by an RDD internally. The sequence of RDDs that will be produced is represent as the DStream "inputStream". Nothing gets actually received until streamingContext.start() is executed later. 

Then when the second line "ones = inputStream.map(event => (event.url, 1))" is executed, it also sets up the information that every RDD (i.e., batch of data) generated by the inputStream will be transformed by the operation RDD.map(event => (event.url, 1)) . Again, this is will happen, once the data is received after streamingContext.start(). And this sequence of mapped RDDs is represented by the ones DStream. So while the Block_1 is executed only once, it internally sets up the computation that every batch of received data will undergo the map operation. In terms of blocks it will be.
@t_0: execute [inputStream's RDD@t_0].map(event => (event.url, 1))
@t_2: execute [inputStream's RDD@t_2].map(event => (event.url, 1))
@t_4: execute [inputStream's RDD@t_4].map(event => (event.url, 1))

Similarly, the 3rd line also sets up the computation that will add up the counts. In this case it would mean a continuous cumulative count.
@t_0: counts of [ones's RDD@t_0]
@t_2: add count of [ones's RDD@t_2] with previous counts@t_0
@t_4: add counts of [one's RDD@t_4] with previous counts@t_2 

Note that your code is only pseudocode in the paper which hides the StreamingContext of the actual API. It may give you more clarity if you run the streaming examples in the Spark repo and take a look at the actual API.
 
 

2. About window method:
Given pseudo code block:

inputStream = readStream("http://...", "2s")
windowStream = inputStream.window(Seconds(8), Seconds(4))
//start Block_2
ones = windowStream.map(event => (event.url, 1))
counts = ones.runningReduce((a, b) => a + b)
//end Block_2

So, does that mean the Block_2 runs on every 4 secs (i did think this case = while(true) { execute Block_2; sleep(4s); } )?
And, would the lifecycle of DStream windowStream be like following ?:
@t_0 -> @t_7: none window completed, Block_2 does not execute.
@t_8: execute Block_2 with windowStream = [RDD@t_0->8]
@t_12: execute Block_2 with windowStream = [RDD@t_0->8, RDD@t_4->12]
@t_16: execute Block_2 with windowStream = [RDD@t_0->8, RDD@t_4->12, RDD@t_8->16]

Similar to the explanation above, Block_2 is execute only once, but sets the relevant operations to be executed on each batch of data after streamingContext.start(). The RDDs of the windowStream at different times will be
@t_0: nothing
@t_4: [inputStream's RDD@t_0->4]
@t_8: [inputStream's RDD@t_0->4 and [inputStream's RDD@t_4->8]
@t_12: [inputStream's RDD@t_4->8 and [inputStream's RDD@t_8->12]
 

3. About the realtime data store
I have read a paper about a realtime datastore named Druid (from Metamarket, if i remember correctly). 
In short, Druid = streaming engine(Kafka + Storm) + zookeeper + realtime nodes + historical nodes (it is said that Druid queries 6T (in-mem) data in 1.4 secs).
Then the question is, if i want to build the realtime data store solution upon Spark platform (or BDAS) then what is the recommended model?


To develop a real-time data store solution using BDAS, at a high level, you will have to do something like the following. 

Using Spark Streaming, you receive the data from some source (Kafka, etc.) in small batches and store it in Spark's memory or using Tachyon. You would to have to figure how much data (1 hour, 2 hours, etc.) do you want to keep in memory, and accordingly assign hardware resources and design window operations. Now you can either run continuous queries by setting up appropriate transformation on the DStreams, or have on-demand Spark computations or Shark SQL queries on this in memory data.

ngoc linh

unread,
Jan 3, 2014, 6:06:41 AM1/3/14
to spark...@googlegroups.com
I got it all, your posts are really helpful.
Thanks very much.
Reply all
Reply to author
Forward
0 new messages