Flink-窗口计算

Posted by kevinkang on September 24, 2023

为什么需要窗口计算

无界数据集统计,通过一个全局窗口统计,是不现实的

只有通过一些窗口的范围, 去签订出来一些有界的数据集, 基于这些有界数据集, 去统计出来一些结果, 最终汇总到我们的系统, 这其实是解决无界数据集统计的一种方式

  • 每5分钟统计一次,统计当前5分钟以内数据中的最大值,出现次数以及 Sum 值
    img
  • 【窗口计算】:
    • 对无界数据集进行有界处理的过程
    • 并通过对窗口上统计, 产生对应输出的结果
      </br>
  • 每3个 event 统计一次,统计3个 event 中数字出现最大值?最小值?Sum 值?
    img

window 应用场景


img

  • 聚合统计: 对数据进行聚合操作(1分钟、5分钟聚合操作等),写入到外围数据库中
  • 记录合并: 对多个kafka的数据源在一定时间窗口内,进行数据合并(如一些用户行为数据,进行合并,减小下游及es写入压力),写入到es中
  • 双流join: 两条流在窗口上面进行join, 数据量会降低在窗口范围内, 聚合后数据写入到kafka里面去
  • Watermark 本身也属于特殊的事件;

window 抽象概念


img

  • flink中窗口会抽象成不同的概念
  • 数据从dataStream接进来的时候, 会去抽取它的 timeStamp 和 waterMark
    • 我们可以看到, 对timeStamp 的获取
  • 对 timeStamp 进行keyBy的操作,生成 keyedSteam 和 DataStream
    • keyedSteam: 把key提取出来, 分成不同的分区, 写入不同的分区
  • 对 keyedSteam 进行一个 window 操作, 生成一个 windowedStream
  • windowAssigner: 根据我们输入的数据集(数据记录), 将数据记录划分成不同的窗口
    • 控制窗口的类型, 时间类型为窗口、SlidingWindow 滑动窗口、滚动窗口、Session 窗口
  • Trigger(可选组件): 控制窗口何时触发
    • 根据不同的窗口类型去选择相应的 window 触发的策略
  • Evictor(可选组件): 数据剔除器, 窗口函数计算之前、计算之后, 对满足条件的一些数据进行相应过滤操作
    • 如需要将符合条件的数据,写入到我们的window Function里面, 通过 Evictor 控制, 剔除不需要数据
  • window Function(核心组件): 窗口函数, 主要用于对窗口内的数据做计算
    • 包括需要对窗口数据,怎样生成对应的统计结果, 那么所有的统计策略, 以及统计的方法, 都是在 window Function 进行定义
  • SideOutput: 与window Function相连, 对数据的输出
    • 可以通过SideOutput Tag去控制数据如何输出到外围, 下游的 DataStream 里面去

window 编程接口


img

Window 组件介绍

Window Assigner

  • Flink 窗口的骨架结构中有两个必须的两个操作:
    • 使用窗口分配器(WindowAssigner)将数据流中的元素分配到对应的窗口。
    • 当满足窗口触发条件后,对窗口内的数据使用窗口处理函数(Window Function)进行处理,常 用的 Window Function 有 reduce、aggregate、process。
      img

Flink支持的窗口类型


img

Sliding Window(滑动窗口)


img

  • 滑动窗口以一个步长(Slide)不断向前滑动, 窗口的长度固定
    • Window Size:窗口大小
    • Window Slide:滑动间隔
  • 数据可以被重复计算,取决于 Size 和 Slide Time
    • Slide Time < Window Size 数据多个窗口中统计
    • Slide Time > Window Size 数据可能不再任何一个 Window中
  • 应用非常广泛
    • 每隔5 min 统计前10 min 的总数

Tumbliing Window(滚动窗口)


img

  • 滚动窗口下窗口之间之间不重叠, 且窗口长度是固定的
    • 特殊的滑动窗口
    • Window size = Window Slide
    • 应用实例:
      • 每5分钟统计一次用户总的成交额

Session Window


img

  • 根据 Session gap 切分不同的窗口
  • 当一个窗口在大于 Session gap 的时间内没有接收到新数据时,窗口关闭。
  • Window Size 可变

Global Window


img

  • 前面不同类型窗口的基础, 通过添加不同的Trigger实现不同类型窗口
  • 只有一个窗口, 没有Trigger触发器
  • 这个窗口的触发操作, 由用户自己去指定, 窗口如何切分(由用户指定)

Predefined Keyed Windows

// Tumbling time window
keyedStream.timeWindow(Time.minutes(1))

// Sliding time window
keyedStream.timeWindow(Time.minutes(1), Time.seconds(10))

// Tumbling count window
keyedStream.countWindow(100)
        
// Sliding count window
keyedStream.countWindow(100, 10)

// Session window
keyedStream.window(EventTimeSessionWindows.withGap(Time. seconds(3))

Predefined Keyed Windows 实例

DataStream<T> input = ...;

// sliding event-time windows
input.keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);

// sliding processing-time windows
input.keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours, 涉及到时区
input.keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>);

Predefined Non-keyed Windows

stream.windowAll()
        
stream.timeWindowAll(Time.seconds(10))
        
stream.countWindowAll(20, 10)

Window Trigger

  • 触发器(Trigger)决定了何时启动 Window Function 来处理窗口中的数据以及何时将窗口内的数据清理。
    img

Window Trigger 触发机制


img
img

  • 整体介绍
    • 左边为接入的数据, 从12:00~12:10数据,对应waterMark为2分钟
    • 这时候我们会通过一个窗口分配器(window assigner)分配对应的窗口, 调用tumblingEventTimeWindows, 指定窗口大小为5分钟
    • 12:00第一条数据进入窗口,窗口有一个状态的维护,也就是说我们接入的数据,会先维系在窗口的状态里面
      • 基于窗口之上,进行状态的统计
      • 在窗口之上,有一个 EventTimeTrigger,对于其来讲,会去控制我们窗口window Function的计算, 以及结果的输出, 包括window Result的输出
  • 数据流程介绍
    • 当第一条数据进来之后,waterMark的时间是11:58 (12:00 - 2min)
    • 当第二条数据进来后, 窗口同样也会将接入的数据进行维护, 我们在窗口统计过程中, 窗口内会进行累加的计算, 累加1和2, 产生状态的统计结果。且waterMark更新为 12:02 对于EventTimeTrigger还是没有触发窗口结果的输出
    • 12:03第三条数据接入, 在窗口内对状态进行存储,这时我们看到, 窗口计算12:00~12:05,状态累计为6,且当前时间未超过上一个事件事件,waterMark不需要进行更新, EventTimeTrigger还是没有触发窗口结果的输出
    • 12:08第四条数据接入, 会把waterMark的事件更新到12:06, 此时 > 12:05 (窗口结束事件), 也就是说我们12:00~12:05这个窗口的数据, 可以进行相应的统计操作, 即 EventTimeTrigger 会对窗口结果输出到 window Result里面。外面的算子就可以接收到12:00~12:05的统计结果6


img

  • 整体介绍
    • 如果想让窗口每隔两分钟就触发一次结果输出呢?而不是依赖 waterMark > window endTime来控制结果输出
    • 这时候可以选择使用 ContinuousEventTimeTrigger, 需要指定一个事件间隔,设置为2分钟, 即eventTime每隔2分钟就会触发一次窗口统计
  • 数据流程介绍
    • 比如说对于第二条数据接入后, 12:04-12:00 = 4min > 2min, 对于 ContinuousEventTimeTrigger就会触发一次窗口的输出, 对于12:00~12:05的结果就会进行一次output
    • 第三条数据12:03接入进来后, 此时waterMark无需更新, 且事件时间间隔 < 2min, 不会触发窗口的计算
    • 第三条数据12:08接入进来后, 此时waterMark更新到12:06, 且12:08-12:04 = 4min > 2min, 对于
      • 【update mode】ContinuousEventTimeTrigger就会触发一次窗口的输出, 此时是一种 update mode,会把前面的输出结果进行一次更新 结果为 1+2+3 = 6 对于第二个窗口12:05~12:10, 此时也满足, 也会进行窗口的输出

      • img

      • img
      • 【append mode】对于在append mode下, 每一次ContinuousEventTimeTrigger触发了窗口结果的统计输出时, 是一种append模式, window result中的状态, 是不断在append
        • 在第一次触发之后产生的结果、第二次触发产生的结果, 都会append到window下游的算子中, 如果此时下游不清楚产生的结果, 就可能会把结果进行累加, 即(3+6), 但其实这并不是我们想要的结果, 导致下游的算子统计上就会有一些问题
          • append mode问题处理:flink中提出了puringTrigger的接口, 可以在 ContinuousEventTimeTrigger 之上进行一次封装,达到的一个效果是, 可以对我们窗口里面的状态进行清除。
          • 即窗口触发完成后, 会对window状态进行清理, 从而保证下游接收到的window Result可以有一个正确的结果
          • 在12:03以及12:08的两台数据来说,此时12:00~12:05两次都是3,因为对状态进行了清理, 保证下游累加正确性

Window Trigger 总结

  • 通常情况下WindowAssigner 中已经默认包含相应的Window Trigger,且适用于大部分场景;
  • 所有基于Event Time 时间的 WindowAssigner 默认Trigger 都是 EventTimeTrigger;
  • GlobalWindow默认为 NeverTrigger,也就是不触发,因此使用 GlobalWindow 必须指定 CustomTrigger,否则窗口永远不触发;
  • 通过 WindowStream 提供的 trigger() 方法能够接受新的 Trigger 实现,但是会覆盖WindowAssigner 自带的默认 Triigger;

Window Evictor

  • 数据清除器(Evictor)
  • 可选组件
    img
  • processing Function 的前后, 都可以使用Window Trigger

Window Evictor 多种介绍


img

Window Evictor 使用

windowStream.evictor(CountEvictor.of(5))

windowStream.evictor(TimeEvictor.of(Time.of(5, TimeUnit.SECONDS)))

windowStream.evictor(DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String,
Integer>>() {
@Override
public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String,
Integer> newDataPoint) {
return newDataPoint.f1 - oldDataPoint.f1;
}}, evictAfter),0,null))

Window Evictor 接口定义

public interface Evictor<T, W extends Window> extends Serializable {
  /**
   * Optionally evicts elements. Called before windowing function.
   */
  void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
  /**
   * Optionally evicts elements. Called after windowing function.
   */
  void evictAfter(Iterable<TimesstampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
  /**
   * A context object that is given to {@link Evictor} methods.
   */
  interface EvictorContext {
    /**
     * Returns the current processing time.
     */
    long getCurrentProcessingTime();
    MetricGroup getMetricGroup();
    /**
     * Returns the current watermark time.
     */
    long getCurrentWatermark();
  } 
}