主要内容:

  • 从SocketSource接收数据,时间语义采用EventTime,统计用户在T分钟内的总消费金额。

1 方案

​ 1.创建记录用户消费的ConsumerMess类:包括三个属性:用户ID、消费金额、消费时间

case class ConsumerMess(userId:Int, spend:Double, Time:Long)

​ 2.要求采用EventTime时间语义,可通过以下设置:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

​ 3.考虑到数据存在乱序的可能性,需要使用Watermark机制处理乱序数据,并设置允许的最大延迟时间。

  • 使用固定时间间隔的Timestamp Assigner指定时间戳(ConsumerMess.Time)和Watermark(延迟2s)。
  • 使用翻滚窗口,窗口长度为T=10s。
data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ConsumerMess](Time.seconds(2)) {
        override def extractTimestamp(element: ConsumerMess): Long = {
          element.Time
        }
      })
      .keyBy(_.userId)
      .timeWindow(Time.seconds(10))

2 代码

package org.ourhome.streamapi

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * @Author Do
 * @Date 2020/4/25 21:33
 */
object EventTImeTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val socketData: DataStream[String] = env.socketTextStream("192.168.237.128", 9999)
    socketData.print("input ")

    socketData.map(line => {
      val str: Array[String] = line.split(",")
      ConsumerMess(str(0).toInt, str(1).toDouble, str(2).toLong)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ConsumerMess](Time.seconds(2)) {
        override def extractTimestamp(element: ConsumerMess): Long = {
          element.Time
        }
      })
      .keyBy(_.userId)
      .timeWindow(Time.minutes(10))
      .reduce((a, b) => ConsumerMess(a.userId, a.spend + b.spend, b.Time))
      .print("out ")

    env.execute()

  }

  case class ConsumerMess(userId:Int, spend:Double, Time:Long)

}

3 测试

Socket发送数据:

第一个字段代表用户123,第二个字段为用户消费金额,第三个字段时间戳。

为方便测试,这里从2020-04-25 12:00:01(1587787201000)开始记录第一条数据,且第1分钟消费100,第2分钟消费200……以此类推:

123,100,1587787201000     2020-04-25 12:00:01
123,400,1587787204000     2020-04-25 12:00:04
123,300,1587787203000     2020-04-25 12:00:03
123,500,1587787205000     2020-04-25 12:00:05
123,200,1587787202000     2020-04-25 12:00:02
123,600,1587787206000     2020-04-25 12:00:06
123,800,1587787208000     2020-04-25 12:00:08
123,700,1587787207000     2020-04-25 12:00:07
123,1000,1587787210000    2020-04-25 12:00:10
123,900,1587787209000     2020-04-25 12:00:09
123,1100,1587787211000    2020-04-25 12:00:11
123,1200,1587787212000    2020-04-25 12:00:12

可以观察到数据是乱序到达Flink。启动Flink程序后,1到11分的数据进入窗口,并以进入的顺序被打印:(input为打印的输入数据)

input > 123,100,1587787201000
input > 123,400,1587787204000
input > 123,300,1587787203000
input > 123,500,1587787205000
input > 123,200,1587787202000
input > 123,600,1587787206000
input > 123,800,1587787208000
input > 123,700,1587787207000
input > 123,1000,1587787210000
input > 123,900,1587787209000
input > 123,1100,1587787211000
input > 123,1200,1587787212000

123,1200,1587787212000这条数据进入窗口,其携带的时间戳为15877872120002020-04-25 12:00:12),延迟时间参数为2s,此时Watermark更新为12-2=10,也就是这条数据对应的Watermark是10s(Watermark(10)),就意味着10s前的数据都已经到了,且窗口内有数据,所以可以触发窗口计算并销毁窗口:(output为打印的结果)

output > ConsumerMess(123,4500.0,1587787209000)

打印出的结果显示,用户123窗口期内共消费4500元。通过计算可知4500是9分钟内消费的金额,但是我们的窗口长度明明是10,这是否正确呢?答案是正确的!

4 timeWindow 窗口边界问题

4.1 getWindowStartWithOffset解析

对于上述现象,自然而言有个疑问:窗口的起始时间是如何确定的呢???顺着源码看一看:
打开timeWindow函数源码,其调用的是javaStream的timeWindow 方法:

def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
    new WindowedStream(javaStream.timeWindow(size))
  }

接着,看一下timeWindow其内部。由于我们使用的是EventTIme时间语义,所以调用的是TumblingEventTimeWindows.of(size)方法:

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}

这里有个start变量,也就是窗口的起始时间,start+size即窗口的结束时间。起始时间的计算调用的是TimeWindow.getWindowStartWithOffset方法:

@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		if (timestamp > Long.MIN_VALUE) {
			// Long.MIN_VALUE is currently assigned when no timestamp is present
			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
			return Collections.singletonList(new TimeWindow(start, start + size));
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}

最后来看看getWindowStartWithOffset,该方法有3个参数:

  • timestamp:启动窗口的时间,单位毫秒
  • offset:时间的偏移量,针对时区问题
  • windowSize:窗口长度
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
		return timestamp - (timestamp - offset + windowSize) % windowSize;
	}

现在,我们根据案例验证一下窗口的起始时间:

timestamp = 1587787201000  (2020-04-25 12:00:01)
offset = 0
windowSize = 10000
带入公式:
start = timestamp - (timestamp - offset + windowSize) % windowSize
得:
start = 1587787200000   (2020-04-25 12:00:00)
end = start + windowSize = 1587787210000   (2020-04-25 12:00:10)

所以:

  • 从2020-04-25 12:00:00开始计算的滚动窗口的起始时间是从2020-04-25 12:00:00到2020-04-25 12:00:10(注意:左闭右开!),也就是[12:00:00, 12:00:10)
  • 再加上watermark要求的延迟2s,也就是[12:00:00, 12:00:12)
    即第一次窗口统计的0-9分钟的数据,也就解释了上一个部分的结果4500的合理性了!

4.2 继续验证

继续输入:

123,1300,1587787213000
123,1500,1587787215000
123,1700,1587787217000
123,2000,1587787220000
123,2100,1587787221000
123,2200,1587787222000

输出:

input > 123,1300,1587787213000
input > 123,1500,1587787215000
input > 123,1700,1587787217000
input > 123,2000,1587787220000
input > 123,2100,1587787221000
input > 123,2200,1587787222000
output > ConsumerMess(123,7800.0,1587787217000)

分析:
按照上面的分析,此次窗口的起始时间应该为[12:00:10, 12:00:20),那么总消费金额应该为:1000+1100+1200+1300+1500+1700=7800,跟输出的结果是一致的!也就是说进入第一个窗口的12:00:10和12:00:11的数据会在第二个窗口触发计算时得到正确的计算,同时未进入第二个窗口的12:00:14、12:00:16、12:00:18和12:00:19的数据将会被遗弃。