主要内容:

  • 介绍Time、Window、Watermark的基本概念
  • 介绍几个实现案例

1 Flink时间语义

时间属性是流处理中最重要的一个方面,是流处理系统的基石之一。Flink作为一个先进的分布式流处理引擎,支持不同的时间语义:

  • Event Time:事件生成时间,可根据每一条处理记录所携带的时间戳来判定。Flink通过时间戳分配器获取改时间。
  • Ingestion Time:事件接入事件,指数据接入Flink DataSource的时间。
  • Processing Time:事件处理时间,指执行转换操作的算子所在的服务器时间。Flink默认的时间语义。

其核心是 Processing Time 和 Event Time。

代码中选择不同时间语义的方式:

// Event Time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Ingestion Time
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// Processing Time
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

2 Window

Window 是无限数据流处理的核心。通过窗口将无限数据流切割为有限数据流,然后对窗口内的数据进行聚合操作,如计算一个小时内有多少用户点击了口网站。

按照是否对DataStream中的Key进行分组,可将其分为两类:

  • Keyed Window
  • Non-Keyed Window

各自的应用形式如下:(其中Window Assigner和Window Function为必选)

// Keyed Window
stream
       .keyBy(...)               <-  按照一个Key进行分组
       .window(...)              <-  将数据流中的元素分配到相应的窗口中 Assigner
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分组,将数据流中的所有元素分配到相应的窗口中
      [.trigger(...)]            <-  指定触发器Trigger(可选)
      [.evictor(...)]            <-  指定清除器Evictor(可选)
       .reduce/aggregate/process()      <-  窗口处理函数Window Function

其中涉及到几种数据类型的转换:

Flink支持两种类型的窗口:(其核心是TimeWindow)

  • TimeWindow:基于时间生成窗口。将制定时间范围内的所有数据组成一个window,一次对一个window里面所有的数据进行计算。如每20s生成一个窗口。
  • CountWindow:基于元素数量(相同Key)生成窗口,与时间无关,如每20个元素生成一个窗口。

Window Assigner

TimeWindow根据实现原理可以分为三类:

  • Tumbling Window:滚动窗口,将数据依据固定的窗口长度进行切片。特点是时间对齐、窗口长度固定、没有重复。
val inputStream: DataStream[T] = ...

val tumbling: DataStream[T] = inputStream
  .keyBy(0) 
  // 窗口长度5s
  .timeWindow(Time.minutes(5))
  .process(...)
  • Sliding Window:滑动窗口,由固定的窗口长度和滑动间隔组成。特点是时间对齐、窗口长度固定、有重叠。
val inputStream: DataStream[T] = ...

val sliding: DataStream[T] = inputStream
  .keyBy(0) 
  // 窗口长度5s,滑动间隔1s,每隔1s计算前5秒的数据
  .timeWindow(Time.seconds(5), Time.seconds(1)) 
  .process(...)
  • Session Window:会话窗口,在规定的时间内(Session Gap)如果没有数据接入,则认为窗口结束。特点:时间无对齐。
val inputStream: DataStream[T] = ...

val session: DataStream[T] = inputStream
  .keyBy(0) 
  // 如果10min没有数据接入就结束当前窗口
  .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  .process(...)

timeWindow使用

v.timeWindow()是简写:

.window(SlidingEventTimeWindows.of(size))
.window(SlidingProcessingTimeWindows.of(size))

或:

.window(TumblingEventTimeWindows.of(size))
.window(TumblingProcessingTimeWindows.of(size))

源码中timeWindow的逻辑:

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

具体是Sliding,还是Tumbling类型窗口,根据参数个数确定:

  • 一个参数:Tumbling
  • 两个参数:Sliding

以上两个参数可以用org.apache.flink.streaming.api.windowing.time.Time中的secondsminuteshoursdays来设置。

具体是EventTime,还是ProcessingTime时间语义,可以根据第一部分中的代码进行设置。

Window Function

对数据集定义了Window Assigner后,数据被分配到不同的窗口里,接下来通过窗口函数,在每个窗口上对窗口内的数据进行处理。窗口函数主要分为两种:

  • 增量计算:指的是窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据,再保存到窗口中。计算性能较高,占用存储空间较少,窗口中只维护了中间结果状态值,不需要缓存原始数据。如RreduceFunctionAggregateFunction
  • 全量计算:指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算。全量计算的代价较高,性能比较弱,因为算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发的时候,对所有的原始数据进行汇总计算。如ProcessWindowFunction

3 Watermark

时间有一个重要的特性:只增不减

对于Processing Time:

使用转换算子所在服务器的时间,假设服务器时钟同步正常,则Processing Time就是有序的数据流。

对于Event Time:

使用绑定在record中的时间,由于网络延迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在一定程度的乱序。

当使用Event Time时,我们就要考虑如何处理这种乱序情况。处理乱序event的方式就是等待更早event到来,但是不能永久等待下去。我们就需要一种策略,何时停止等待。这就是watermark的作用:何时不再等待更早数据,开始计算窗口内的event。

watermark是一种衡量Event Time进展的机制。watermark是用于处理乱序事件的,而正确的处理乱序事件 ,通常用watermark机制结合window 来实现。可以理解为一个延迟触发机制。watermark是数据本身的一个隐藏属性,数据本身携带着对相应的watermark,因此,如果运行过程中无法获取新的数据,那么窗口将永远无法触发。watermark(t):时间戳t以前的event都已经到了,未来小于等于t的event不会再来,可以触发并销毁窗口了。

上图中,有序数据的watermark为0,无序数据中设置允许的最大延迟到达时间为2s,所以时间戳为6s的事件对应的watermark时4s,时间戳为10s的事件的watermark是8s。如果我们的窗口1是1s4s,窗口2是5s8s,那么时间戳为6s的事件到达时watermark恰好触发窗口1,时间戳为10s的事件到达时的watermark恰好触发窗口2。

注意:

  • 窗口是左闭右开,形式:[window_start_time, window_end_time)
  • window的设定是系统定义好了的,window会一直按照指定的时间间隔进行划分,不论这个window中有没有数据。
  • Flink会根据Event time是否在这个window期间,将数据发送到相应window中。属于这个 window 范围的数据会被不断加入到 window 中,所有未触发的window都会等待触发。只要window没有触发,属于这个window范围的数据就会一直被加入到该window,直到window被触发才会停止数据的追加。当window触发后才接受到的数据会丢失。

Flink提供两种方式指定timestamp生成watremark:

  • SourceFunction:在event源头生成
    • ctx.collectWithTimestamp(element, timestamp) :其中,element:要发送的event,timestamp:event中的时间戳
    • ctx.emitWatermark(new Watermark(timestamp)) : 其中,timestamp:设置最大延迟
class CustomGenerator extends SourceFunction[(Int, Long)] {
    private var running = true
    override def run(ctx: SourceFunction.SourceContext[(Int, Long)]): Unit = {
      // 随机数生成器
      val randomNum: Random = new Random()
      while (running) {
        val n: (Int, Long) = (randomNum.nextInt(3), new Date().getTime())
        // 利用ctx上下文将数据返回
        // 设定timestamp
        ctx.collectWithTimestamp(n, n._2)
        // 生成watermark  最大延时10ms
        ctx.emitWatermark(new Watermark(n._2 - 10))
        Thread.sleep(500)
      }
    }
    override def cancel(): Unit = {
      running = false
    }
  }
  • Timestamp Assigner:Flink自带,根据生成方式分两种:

    • Periodic Watermark:周期性地生成,两种实现:

      • Ascending Timestamp:升序模式,适用于顺序event:
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      socketData.map(line => {
            (line.split(",")(0), line.split(",")(1).toInt)
          })
            // 指定时间字段
            .assignAscendingTimestamps(_._2)
            .keyBy(_._1)
            .timeWindow(Time.minutes(3))
            .sum(0)
            .print()
      
      • Bounded-out-of-orderness:固定延时间隔
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      socketData.map(line => {
            (line.split(",")(0), line.split(",")(1).toInt)
          })
            // 指定时间字段
            .assignTimestampsAndWatermarks(new MyAssigner)
            .peocess(...)
      // 参数为延迟时间
      class MyAssigner extends BoundedOutOfOrdernessTimestampExtractor[(String, Int)](Time.seconds(3)) {
          // 指定时间字段
          override def extractTimestamp(element: (String, Int)): Long = {
            element._2
          }
        }
      
    • Punctuated Watermark:根据接入event的数量生成