从源码中,根据关键的代码,梳理一下Flink中的时间与窗口实现逻辑。
WindowedStream
对数据流执行keyBy()操作后,再调用window()方法,就会返回WindowedStream,表示分区后又加窗的数据流。如果数据流没有经过分区,直接调用window()方法则会返回AllWindowedStream。
如下:
// 构造函数 public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) { this.input = input; this.builder = new WindowOperatorBuilder<>( windowAssigner, windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()), input.getExecutionConfig(), input.getType(), input.getKeySelector(), input.getKeyType()); } // KeyedStream类型,表示被加窗的输入流。 private final KeyedStream<T, K> input; // 用于构建WindowOperator,最终会生成windowAssigner,Evictor,Trigger private final WindowOperatorBuilder<T, K, W> builder;
在这里面还涉及到一些窗口的基本计算算子,比如reduce,aggregate,apply,process,sum等等.
窗口相关模型的实现
Window
Window类是Flink中对窗口的抽象。它是一个抽象类,包含抽象方法maxTimestamp(),用于获取属于该窗口的最大时间戳。
TimeWindow类是其子类。包含了窗口的start,end,offset等时间概念字段,这里会计算窗口的起始时间:
// 构造函数 public TimeWindow(long start, long end) { this.start = start; this.end = end; } // timestamp:获取窗口启动时的第一个时间戳epoch毫秒 public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { final long remainder = (timestamp - offset) % windowSize; // handle both positive and negative cases if (remainder < 0) { return timestamp - (remainder + windowSize); } else { return timestamp - remainder; } }
WindowAssigner
WindowAssigner表示窗口分配器,用来把元素分配到零个或多个窗口(Window对象)中。它是一个抽象类,其中重要的抽象方法为assignWindows()方法,用来给元素分配窗口。
Flink有多种类型的窗口,如Tumbling Window、Sliding Window等。各种类型的窗口又分为基于事件时间或处理时间的窗口。WindowAssigner的实现类就对应着具体类型的窗口。
SlidingEventTimeWindows是WindowAssigner的另一个实现类,表示基于事件时间的Sliding Window。它有3个long类型的字段size、slide和offset,分别表示窗口的大小、滑动的步长和窗口起始位置的偏移量。它对assignWindows()方法的实现如下:
@Override public Collection<TimeWindow> assignWindows( Object element, long timestamp, WindowAssignerContext context) { // Long.MIN_VALUE is currently assigned when no timestamp is present if (timestamp > Long.MIN_VALUE) { if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } long start = TimeWindow.getWindowStartWithOffset( timestamp, (globalOffset + staggerOffset) % size, size); // 返回构建好起止时间的TimeWindow return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }
设置窗口触发器Trigger
@Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }
WindowAssigner与其主要实现类的关系如下:

这些类的含义分别如下
- GlobalWindows:将所有元素分配进同一个窗口的全局窗口分配器。
- SlidingEventTimeWindows:基于事件时间的滑动窗口分配器。
- SlidingProcessingTimeWindows:基于处理时间的滑动窗口分配器。
- TumblingEventTimeWindows:基于事件时间的滚动窗口分配器。
- TumblingProcessingTimeWindows:基于处理时间的滚动窗口分配器。
- EventTimeSessionWindows:基于事件时间的会话窗口分配器。
- ProcessingTimeSessionWindows:基于处理时间的会话窗口分配器。
Trigger
Trigger表示窗口触发器。它是一个抽象类,主要定义了下面3个方法用于确定窗口何时触发计算:
// 每个元素到来时触发 public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; // 处理时间的定时器触发时 public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; // 事件时间的定时器触发时调用 public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
这3个方法的返回结果为TriggerResult对象。TriggerResult是一个枚举类,包含两个boolean类型的字段fire和purge,分别表示窗口是否触发计算和窗口内的元素是否需要清空。
CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true); TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; }
窗口触发器的实现由用户根据业务需求自定义。Flink默认基于事件时间的触发器为EventTimeTrigger,其三个方法处理如下
@Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // 如果水印已经超过窗口,则立即触发 return TriggerResult.FIRE; } else { // 注册事件时间定时器 ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } /* * 处理时间,窗口不触发计算也不清空内部元素。 */ @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; }
Trigger与其主要实现类的继承关系

这些类的含义如下
- CountTrigger:元素数达到设置的个数时触发计算的触发器。
- DeltaTrigger:基于DeltaFunction和设置的阈值触发计算的触发器。
- EventTimeTrigger:基于事件时间的触发器。
- ProcessingTimeTrigger:基于处理时间的触发器。
- PurgingTrigger:可包装其他触发器的清空触发器。
- ContinuousEventTimeTrigger:基于事件时间并按照一定的时间间隔连续触发计算的触发器。
- ContinuousProcessingTimeTrigger:基于处理时间并按照一定的时间间隔连续触发计算的触发器。
windowOperator
从WindowedStream的构造函数中,会生成WindowOperatorBuilder,该类可以返回WindowOperator,这两个类负责窗口分配器、窗口触发器和窗口剔除器这些组件在运行时的协同工作。
对于WindowOperator,除了窗口分配器和窗口触发器的相关字段,可以先了解下面两个字段。
// StateDescriptor类型,表示窗口状态描述符。 private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor; // 表示窗口的状态,窗口内的元素都在其中维护。 private transient InternalAppendingState<K, W, IN, ACC, ACC> windowState;
窗口中的元素并没有保存在Window对象中,而是维护在windowState中。windowStateDescriptor则是创建windowState所需用到的描述符。
当有元素到来时,会调用WindowOperator的processElement()方法:
public void processElement(StreamRecord<IN> element) throws Exception { // 分配窗口 final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); ... if (windowAssigner instanceof MergingWindowAssigner) { // Session Window的情况 ... } else { for (W window: elementWindows) { // 非Session Window的情况 ... // 将Window对象设置为namespace并添加元素到windowState中 windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; // 获取TriggerResult,确定接下来是否需要触发计算或清空窗口 TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } // 触发计算 emitWindowContents(window, contents); } if (triggerResult.isPurge()) { // 清空窗口 windowState.clear(); } ... } } ... }
在处理时间或事件时间的定时器触发时,会调用WindowOperator的onProcessingTime()方法或onEventTime()方法,其中的逻辑与onElement()方法的大同小异。
Watermarks
水位线(watermark)是选用事件时间来进行数据处理时特有的概念。它的本质就是时间戳,从上游流向下游,表示系统认为数据中的事件时间在该时间戳之前的数据都已到达。
Flink中,Watermark类表示水位。
/** Creates a new watermark with the given timestamp in milliseconds. */ public Watermark(long timestamp) { this.timestamp = timestamp; }
watermark的生成有两种方式,这里不赘述,主要讲述下基于配置的策略生成watermark的方式。如下的代码是比较常见的配置:
// 分配事件时间与水印 .assignTimestampsAndWatermarks( // forBoundedOutOfOrderness 会根据事件的时间戳和允许的最大乱序时间生成水印。 // Duration 设置了最大乱序时间为1秒。这意味着 Flink 将允许在这1秒的时间范围内的事件不按照事件时间的顺序到达,这个时间段内的事件会被认为是"有序的"。 WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1)) // 设置事件时间分配器,从Event对象中提取时间戳作为事件时间 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.timestamp; } }));
在Flink内部,会根据配置的策略调用BoundedOutOfOrdernessWatermarks生成watermark。该类的代码如下:
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> { /** The maximum timestamp encountered so far. */ private long maxTimestamp; /** The maximum out-of-orderness that this watermark generator assumes. */ private final long outOfOrdernessMillis; /** * Creates a new watermark generator with the given out-of-orderness bound. * * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps. */ public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // start so that our lowest watermark would be Long.MIN_VALUE. this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } // ------------------------------------------------------------------------ @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { // 每条数据都会更新最大值 maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发送 watermark 逻辑 output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } }
onEvent决定每次事件都会取得最大的事件时间更新;onPeriodicEmit则是周期性的更新并传递到下游。
AbstractStreamOperator
WatermarkGenerator接口的调用是在AbstractStreamOperator抽象类的子类TimestampsAndWatermarksOperator中。其生命周期open函数与每个数据到来的处理函数processElement,如下:
@Override public void open() throws Exception { super.open(); timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup); watermarkGenerator = emitProgressiveWatermarks ? watermarkStrategy.createWatermarkGenerator(this::getMetricGroup) : new NoWatermarksGenerator<>(); wmOutput = new WatermarkEmitter(output); watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0 && emitProgressiveWatermarks) { final long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } } @Override public void processElement(final StreamRecord<T> element) throws Exception { final T event = element.getValue(); final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE; // 从分配器中提取事件时间戳 final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp); element.setTimestamp(newTimestamp); output.collect(element); // 调用水印生成器 watermarkGenerator.onEvent(event, newTimestamp, wmOutput); }
从方法的入参可以看出来 flink 算子间的数据流动是 StreamRecord 对象。它对数据的处理逻辑是什么都不做直接向下游发送,然后调用 onEvent 记录最大时间戳,也就是说:flink 是先发送数据再生成 watermark,watermark 永远在生成它的数据之后。
总结
上面的一系列相关代码,只是冰山一角,暂时只是把关键涉及到的部分捋了一下。最后画个图,展示其大致思路。

参考: