The general structure of a windowed Flink program is presented below. The first snippet refers to keyed streams,while the second to non-keyed ones. As one can see, the only difference is the keyBy(...) call for the keyed streamsand the window(...) which becomes windowAll(...) for non-keyed streams. This is also going to serve as a roadmapfor the rest of the page.
In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and thewindow is completely removed when the time (event or processing time) passes its end timestamp plus the user-specifiedallowed lateness (see Allowed Lateness). Flink guarantees removal only for time-basedwindows and not for other types, e.g. global windows (see Window Assigners). For example, with anevent-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowedlateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element witha timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06timestamp.
In the following we go into more detail for each of the components above. We start with the required parts in the abovesnippet (see Keyed vs Non-Keyed Windows, Window Assigners, andWindow Functions) before moving to the optional ones.
The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window.Using the keyBy(...) will split your infinite stream into logical keyed streams. If keyBy(...) is not called, yourstream is not keyed.
In the case of keyed streams, any attribute of your incoming events can be used as a key(more details here). Having a keyed stream willallow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processedindependently from the rest. All elements referring to the same key will be sent to the same parallel task.
After specifying whether your stream is keyed or not, the next step is to define a window assigner.The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssignerof your choice in the window(...) (for keyed streams) or the windowAll() (for non-keyed streams) call.
A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comeswith pre-defined window assigners for the most common use cases, namely tumbling windows,sliding windows, session windows and global windows. You can also implement a custom window assigner byextending the WindowAssigner class. All built-in window assigners (except the globalwindows) assign elements to windows based on time, which can either be processing time or eventtime. Please take a look at our section on event time to learnabout the difference between processing time and event time and how timestamps and watermarks are generated.
Time-based windows have a start timestamp (inclusive) and an end timestamp (exclusive)that together describe the size of the window. In code, Flink uses TimeWindow when working withtime-based windows which has methods for querying the start- and end-timestamp and also anadditional method maxTimestamp() that returns the largest allowed timestamp for a given windows.
A tumbling windows assigner assigns each element to a window of a specified window size.Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumblingwindow with a size of 5 minutes, the current window will be evaluated and a new window will bestarted every five minutes as illustrated by the following figure.
The sliding windows assigner assigns elements to windows of fixed length. Similar to a tumblingwindows assigner, the size of the windows is configured by the window size parameter.An additional window slide parameter controls how frequently a sliding window is started. Hence,sliding windows can be overlapping if the slide is smaller than the window size. In this case elementsare assigned to multiple windows.
For example, you could have windows of size 10 minutes that slides by 5 minutes. With this you get every5 minutes a window that contains the events that arrived during the last 10 minutes as depicted by thefollowing figure.
The session windows assigner groups elements by sessions of activity. Session windows do not overlap anddo not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead asession window closes when it does not receive elements for a certain period of time, i.e., when a gap ofinactivity occurred. A session window assigner can be configured with either a static session gap or with asession gap extractor function which defines how long the period of inactivity is. When this period expires,the current session closes and subsequent elements are assigned to a new session window.
A global windows assigner assigns all elements with the same key to the same single global window.This windowing scheme is only useful if you also specify a custom trigger. Otherwise,no computation will be performed, as the global window does not have a natural end atwhich we could process the aggregated elements.
After defining the window assigner, we need to specify the computation that we wantto perform on each of these windows. This is the responsibility of the window function, which is used to process theelements of each (possibly keyed) window once the system determines that a window is ready for processing(see triggers for how Flink determines when a window is ready).
The window function can be one of ReduceFunction, AggregateFunction, or ProcessWindowFunction. The firsttwo can be executed more efficiently (see State Size section) because Flink can incrementally aggregatethe elements for each window as they arrive. A ProcessWindowFunction gets an Iterable for all the elements contained in awindow and additional meta information about the window to which the elements belong.
A windowed transformation with a ProcessWindowFunction cannot be executed as efficiently as the othercases because Flink has to buffer all elements for a window internally before invoking the function.This can be mitigated by combining a ProcessWindowFunction with a ReduceFunction, or AggregateFunction toget both incremental aggregation of window elements and the additional window metadata that theProcessWindowFunction receives. We will look at examples for each of these variants.
An AggregateFunction is a generalized version of a ReduceFunction that has three types: aninput type (IN), accumulator type (ACC), and an output type (OUT). The input type is the typeof elements in the input stream and the AggregateFunction has a method for adding one inputelement to an accumulator. The interface also has methods for creating an initial accumulator,for merging two accumulators into one accumulator and for extracting an output (of type OUT) froman accumulator. We will see how this works in the example below.
A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Contextobject with access to time and state information, which enables it to provide more flexibility thanother window functions. This comes at the cost of performance and resource consumption, becauseelements cannot be incrementally aggregated but instead need to be buffered internally until thewindow is considered ready for processing.
The key parameter is the key that is extractedvia the KeySelector that was specified for the keyBy() invocation. In case of tuple-indexkeys or string-field references this key type is always Tuple and you have to manually castit to a tuple of the correct size to extract the key fields.
A ProcessWindowFunction can be combined with either a ReduceFunction, or an AggregateFunction toincrementally aggregate elements as they arrive in the window.When the window is closed, the ProcessWindowFunction will be provided with the aggregated result.This allows it to incrementally compute windows while having access to theadditional window meta information of the ProcessWindowFunction.
Per-window state is tied to the latter of those two. Meaning that if we process events for 1000different keys and events for all of them currently fall into the [12:00, 13:00) time windowthen there will be 1000 window instances that each have their own keyed per-window state.
This feature is helpful if you anticipate multiple firing for the same window, as can happen whenyou have late firings for data that arrives late or when you have a custom trigger that doesspeculative early firings. In such a case you would store information about previous firings orthe number of firings in per-window state.
In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. Thisis an older version of ProcessWindowFunction that provides less contextual information and doesnot have some advances features, such as per-window keyed state. This interface will be deprecatedat some point.
A Trigger determines when a window (as formed by the window assigner) is ready to beprocessed by the window function. Each WindowAssigner comes with a default Trigger.If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).
Once a trigger determines that a window is ready for processing, it fires, i.e., it returns FIRE or FIRE_AND_PURGE. This is the signal for the window operatorto emit the result of the current window. Given a window with a ProcessWindowFunctionall elements are passed to the ProcessWindowFunction (possibly after passing them to an evictor).Windows with ReduceFunction, or AggregateFunction simply emit their eagerly aggregated result.
When a trigger fires, it can either FIRE or FIRE_AND_PURGE. While FIRE keeps the contents of the window, FIRE_AND_PURGE removes its content.By default, the pre-implemented triggers simply FIRE without purging the window state.
The default Trigger of a WindowAssigner is appropriate for many use cases. For example, all the event-time window assigners have an EventTimeTrigger asdefault trigger. This trigger simply fires once the watermark passes the end of a window.
d3342ee215