flink window原理(Flink Window基本概念与实现原理)

操作系统 2023-07-23 16:25www.caominkang.comlinux操作系统

Windo意为窗口。在流处理系统中数据源源不断流入到系统,我们可以逐条处理流入的数据,也可以按一定规则一次处理流中的多条数据。当处理数据时程序需要知道什么时候开始处理、处理哪些数据。窗口提供了这样一种依据,决定了数据何时开始处理。

Flink内置Windo

Flink有3个内置Windo

  • 以事件数量驱动的Count Windo
  • 以会话间隔驱动的Session Windo
  • 以时间驱动的Time Windo

本文围绕这3个内置窗口展开讨论,我们了解这3个窗口在运行时产生的现象,再讨论它们的实现原理。

Count Windo

计数窗口,采用事件数量作为窗口处理依据。计数窗口分为滚动和滑动两类,使用keyedStream.countWindo实现计数窗口定义。

  • Tumbling Count Windo 滚动计数窗口
  • 例子以用户分组,当每位用户有3次付款事件时计算一次该用户付款总金额。下图中“消息A、B、C、D”代表4位不同用户,我们以A、B、C、D分组并计算金额。
/ 每3个事件,计算窗口内数据 / keyedStream.countWindo(3);

  • Sliding Count Windo 滑动计数窗口
  • 例子一位用户每3次付款事件计算最近4次付款事件总金额。
/ 每3个事件,计算最近4个事件消息 / keyedStream.countWindo(4,3);

Session Windo

会话窗口,采用会话持续时长作为窗口处理依据。设置指定的会话持续时长时间,在这段时间中不再出现会话则认为超出会话时长。

例子每只股票超过2秒没有交易事件时计算窗口内交易总金额。下图中“消息A、消息B”代表两只不同的股票。

/ 会话持续2秒。当超过2秒不再出现会话认为会话结束 / keyedStream.indo(ProcessingTimeSessionWindos.ithGap(Time.seconds(2)))

Time Windo

时间窗口,采用时间作为窗口处理依据。时间窗分为滚动和滑动两类,使用keyedStream.timeWindo实现时间窗定义。

  • Tumbling Time Windo 滚动时间窗口
/ 每1分钟,计算窗口数据 / keyedStream.timeWindo(Time.minutes(1));

  • Sliding Time Windo 滑动时间窗口
/ 每半分钟,计算最近1分钟窗口数据 / keyedStream.timeWindo(Time.minutes(1), Time.seconds(30));

Flink Windo组件

Flink Windo使用3个组件协同实现了内置的3个窗口。通过对这3个组件不同的组合,可以满足许多场景的窗口定义。

WindoAssigner组件为数据分配窗口、Trigger组件决定如何处理窗口中的数据、借助Evictor组件实现灵活清理窗口中数据时机。

WindoAssigner

当有数据流入到Windo Operator时需要按照一定规则将数据分配给窗口,WindoAssigner为数据分配窗口。下面代码片段是WindoAssigner部分定义,assignWindos方法定义返回的结果是一个集合,也就是说数据允许被分配到多个窗口中。

/ WindoAssigner关键接口定义 / public abstract class WindoAssigner implements Serializable { / 分配数据到窗口集合并返回 / public abstract Collection assignWindos(T element, long timestamp, WindoAssignerContext context); }

Flink内置WindoAssigner

Flink针对不同窗口类型实现了相应的WindoAssigner。Flink 1.7.0继承关系如下图

Trigger

Trigger触发器,它定义了3个触发动作,并且定义了触发动作处理完毕后的返回结果。返回结果交给Windo Operator后由Windo Operator决定后续操作。也就是说,Trigger通过具体的动作处理结果决定窗口是否应该被处理、被清除、被处理+清除、还是什么都不做。

/ Trigger关键接口定义 / public abstract class Trigger implements Serializable { / 新的数据进入窗口时触发 / public abstract TriggerResult onElement(T element, long timestamp, W indo, TriggerContext ctx) thros Exception; / 处理时间计数器触发 / public abstract TriggerResult onProcessingTime(long time, W indo, TriggerContext ctx) thros Exception; / 事件时间计数器触发 / public abstract TriggerResult onEventTime(long time, W indo, TriggerContext ctx) thros Exception; }

当有数据流入Windo Operator时会触发onElement方法、当处理时间和事件时间生效时会触发onProcessingTime和onEventTime方法。每个触发动作的返回结果用TriggerResult定义。

TriggerResult返回类型及说明

Trigger触发运算后返回处理结果,处理结果使用TriggerResult枚举表示。

public enum TriggerResult { CONTINUE,FIRE,PURGE,FIRE_AND_PURGE; }

Flink内置Trigger

Flink的内置窗口(Counter、Session、Time)有自己的触发器实现。下表为不同窗口使用的触发器。

Evictor

Evictor驱逐者,如果定义了Evictor当执行窗口处理前会删除窗口内指定数据再交给窗口处理,或等窗口执行处理后再删除窗口中指定数据。

public interface Evictor extends Serializable { / 在窗口处理前删除数据 / void evictBefore(Iterable> elements, int size, W indo, EvictorContext evictorContext); / 在窗口处理后删除数据 / void evictAfter(Iterable> elements, int size, W indo, EvictorContext evictorContext); }

Flink内置Evictor

实现原理

通过KeyedStream可以直接创建Count Windo和Time Windo。他们最终都是基于indo(WindoAssigner)方法创建,在indo方法中创建WindoedStream实例,参数使用当前的KeyedStream对象和指定的WindoAssigner。

/ 依据WindoAssigner实例化WindoedStream / public WindoedStream indo(WindoAssigner assigner) { return ne WindoedStream<>(this, assigner); } / WindoedStream构造器 / public WindoedStream(KeyedStream input, WindoAssigner indoAssigner) { this.input = input; this.indoAssigner = indoAssigner; this.trigger = indoAssigner.getDefaultTrigger(input.getExecutionEnvironment()); }

构造器执行完毕后,WindoedStream创建完成。构造器中初始化了3个属性。默认情况下trigger属性使用WindoAssigner提供的DefaultTrigger作为初始值。

,WindoedStream提供了trigger方法用来覆盖默认的trigger。Flink内置的计数窗口就使用indoedStream.trigger方法覆盖了默认的trigger。

public WindoedStream trigger(Trigger trigger) { if (indoAssigner instanceof MergingWindoAssigner && !trigger.canMerge()) { thro ne UnsupportedOperationException(); } if (indoAssigner instanceof BaseAlignedWindoAssigner) { thro ne UnsupportedOperationException(); } this.trigger = trigger; return this; }

在WindoedStream中还有一个比较重要的属性evictor,可以通过evictor方法设置。

public WindoedStream evictor(Evictor evictor) { if (indoAssigner instanceof BaseAlignedWindoAssigner) { thro ne UnsupportedOperationException(); } this.evictor = evictor; return this; }

WindoedStream实现中根据evictor属性是否空(null == evictor)决定是创建WindoOperator还是EvictingWindoOperator。EvictingWindoOperator继承自WindoOperator,它主要扩展了evictor属性以及相关的逻辑处理。

public class EvictingWindoOperator extends WindoOperator { private final Evictor evictor; }

Evictor定义了清理数据的时机。在EvictingWindoOperator的emitWindoContents方法中,实现了清理数据逻辑调用。这也是EvictingWindoOperator与WindoOperator的主要区别。「在WindoOperator中压根就没有evictor的概念」

private void emitWindoContents(W indo, Iterable> contents, ListState> indoState) thros Exception { / Windo处理前数据清理 / evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); / Windo处理 / userFunction.process(triggerContext.key, triggerContext.indo, processContext, projectedContents, timestampedCollector); / Windo处理后数据清理 / evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); }

Count Windo API

下面代码片段是KeyedStream提供创建Count Windo的API。

/ 滚动计数窗口 / public WindoedStream countWindo(long size) { return indo(GlobalWindos.create()).trigger(PurgingTrigger.of(CountTrigger.of(size))); } / 滑动计数窗口 / public WindoedStream countWindo(long size, long slide) { return indo(GlobalWindos.create()) .evictor(CountEvictor.of(size)) .trigger(CountTrigger.of(slide)); }

滚动计数窗口与滑动计数窗口有几个差异

  • 入参不同
  • 滑动窗口使用了evictor组件
  • 两者使用的trigger组件不同

下面我们对这几点差异做深入分析,看一看他们是如何影响滚动计数窗口和滑动计数窗口的。

Count Windo Assigner

通过方法indo(GlobalWindos.create())创建WindoedStream实例,滚动计数窗口处理和滑动计数窗口处理都是基于GlobalWindos作为WindoAssigner来创建窗口处理器。GlobalWindos将所有数据都分配到同一个GlobalWindo中。「这里需要注意GlobalWindos是一个WindoAssigner,而GlobalWindo是一个Windo」

/ GlobalWindos是一个WindoAssigner实现,这里只展示实现assignWindos的代码片段 / public class GlobalWindos extends WindoAssigner { / 返回一个GlobalWindo / public Collection assignWindos(Object element, long timestamp, WindoAssignerContext context) { return Collections.singletonList(GlobalWindo.get()); } }

GlobalWindo继承了Windo,表示为一个窗口。对外提供get()方法返回GlobalWindo实例,并且是个全局单例。所以当使用GlobalWindos作为WindoAssigner时,所有数据将被分配到一个窗口中。

/ GlobalWindo是一个Windo / public class GlobalWindo extends Windo { private static final GlobalWindo INSTANCE = ne GlobalWindo(); / 永远返回GlobalWindo单例 / public static GlobalWindo get() { return INSTANCE; } }

Count Windo Trigger

滚动计数窗口创建时使用PurgingTrigger.of(CountTrigger.of(size))覆盖了GlobalWindos默认的Trigger,而滑动计数窗口创建时使用CountTrigger.of(size)覆盖了GlobalWindos默认的Trigger。

PurgingTrigger是一个代理模式的Trigger实现,在计数窗口中PurgingTrigger代理了CountTrigger。

/ PurgingTrigger代理的Trigger / private Trigger nestedTrigger; / PurgingTrigger私有构造器 / private PurgingTrigger(Trigger nestedTrigger) { this.nestedTrigger = nestedTrigger; } / 为代理的Trigger构造一个PurgingTrigger实例 / public static PurgingTrigger of(Trigger nestedTrigger) { return ne PurgingTrigger<>(nestedTrigger); }

在这里比较一下PurgingTrigger.onElement和CountTrigger.onElement方法实现,帮助理解PurgingTrigger的作用。

/ CountTrigger实现 / public TriggerResult onElement(Object element, long timestamp, W indo, TriggerContext ctx) thros Exception { ReducingState count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } / PurgingTrigger实现 / public TriggerResult onElement(T element, long timestamp, W indo, TriggerContext ctx) thros Exception { TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, indo, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; }

在CountTrigger实现中,当事件流入窗口后计数+1,之后比较窗口中事件数是否大于设定的最大数量,一旦大于最大数量返回FIRE。也就是说只处理窗口数据,不做清理。

在PurgingTrigger实现中,依赖CountTrigger的处理逻辑,但区别在于当CounterTrigger返回FIRE时PurgingTrigger返回FIRE_AND_PURGE。也就是说不仅处理窗口数据,还做数据清理。通过这种方式实现了滚动计数窗口数据不重叠。

Count Windo Evictor

滚动计数窗口和滑动计数窗口另一个区别在于滑动计数窗口通过indoedStream.evictor(CountEvictor.of(size))方法设置了Evictor,而滚动窗口并没有设置Evictor。

滑动计数窗口依赖Evictor组件在窗口处理前清除了指定数量以外的数据,再交给窗口处理。通过这种方式实现了窗口计算最近指定次数的事件数量。

Time Windo API

下面代码片段是KeyedStream中提供创建Time Windo的API。

/ 创建滚动时间窗口 / public WindoedStream timeWindo(Time size) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return indo(TumblingProcessingTimeWindos.of(size)); } else { return indo(TumblingEventTimeWindos.of(size)); } } / 创建滑动时间窗口 / public WindoedStream timeWindo(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return indo(SlidingProcessingTimeWindos.of(size, slide)); } else { return indo(SlidingEventTimeWindos.of(size, slide)); } }

创建TimeWindo时会根据Flink应用当前时间类型environment.getStreamTimeCharacteristic()来决定使用哪个WindoAssigner创建窗口。

Flink对时间分成了3类。处理时间、摄入时间、事件时间。使用TimeCharacteristic枚举定义。

public enum TimeCharacteristic { / 处理时间 / ProcessingTime, / 摄入时间 / IngestionTime, / 事件时间 / EventTime }

对于Flink的3个时间概念,我们目前只需要了解

  • 处理时间(TimeCharacteristic.ProcessingTime)就是运行Flink环境的系统时钟产生的时间
  • 事件时间(TimeCharacteristic.EventTime)是业务上产生的时间,由数据自身携带
  • 摄入时间(TimeCharacteristic.IngestionTime)是数据进入到Flink的时间,它在底层实现上与事件时间相同。

Time Windo Assigner

下面的表格中展示了窗口类型和时间类型对应的WindoAssigner的实现类

我们以一个TumblingProcessingTimeWindos和一个SlidingEventTimeWindos为例,讨论它的实现原理。

TumblingProcessingTimeWindos

TumblingProcessingTimeWindos基于处理时间的滚动时间窗口分配器,它是一个WindoAssigner。Flink提供两个接口初始化TumblingProcessingTimeWindos

public static TumblingProcessingTimeWindos of(Time size) { return ne TumblingProcessingTimeWindos(size.toMilliseconds(), 0); } public static TumblingProcessingTimeWindos of(Time size, Time offset) { return ne TumblingProcessingTimeWindos(size.toMilliseconds(), offset.toMilliseconds()); }

不管使用哪种方式初始化TumblingProcessingTimeWindos,最终都会调用同一个构造方法初始化,构造方法初始化size和offset两个属性。

/ TumblingProcessingTimeWindos构造器 / private TumblingProcessingTimeWindos(long size, long offset) { if (offset < 0 || offset >= size) { thro ne IllegalArgumentException(); } this.size = size; this.offset = offset; }

TumblingProcessingTimeWindos是一个WindoAssigner,所以它实现了assignWindos方法来为流入的数据分配窗口。

public Collection assignWindos(Object element, long timestamp, WindoAssignerContext context) { final long no = context.getCurrentProcessingTime(); long start = TimeWindo.getWindoStartWithOffset(no, offset, size); return Collections.singletonList(ne TimeWindo(start, start + size)); }

第一步assignWindos获得系统当前时间戳,context.getCurrentProcessingTime();最终实现实际是调用System.currentTimeMillis()。

第二步执行TimeWindo.getWindoStartWithOffset(no, offset, size);这个方法根据当前时间、偏移量、设置的间隔时间最终计算窗口起始时间。

第三步根据起始时间和结束时间创建一个新的窗口ne TimeWindo(start, start + size)并返回。

比如,电脑维修网希望每10秒处理一次窗口数据keyedStream.timeWindo(Time.seconds(10))。当数据源源不断的流入Windo Operator时,它会按10秒切割一个时间窗。

我们假设数据在2019年1月1日 12:00:07到达,那么窗口以下面方式切割(请注意,窗口是左闭右开)。

Windo[2019年1月1日 12:00:00, 2019年1月1日 12:00:10)

如果在2019年1月1日 12:10:09又一条数据到达,窗口是这样的

Windo[2019年1月1日 12:10:00, 2019年1月1日 12:10:10)

如果我们电脑维修网希望从第15秒开始,每过1分钟计算一次窗口数据,这种场景需要用到offset。基于处理时间的滚动窗口可以这样写

keyedStream.indo(TumblingProcessingTimeWindos.of(Time.minutes(1), Time.seconds(15)))

我们假设数据从2019年1月1日 12:00:14到达,那么窗口以下面方式切割

Windo[2019年1月1日 11:59:15, 2019年1月1日 12:00:15)

如果在2019年1月1日 12:00:16又一数据到达,那么窗口以下面方式切割

Windo[2019年1月1日 12:00:15, 2019年1月1日 12:01:15)

TumblingProcessingTimeWindos.assignWindos方法每次都会返回一个新的窗口,也就是说窗口是不重叠的。但因为TimeWindo实现了equals方法,所以通过计算后start, start + size相同的数据,在逻辑上是同一个窗口。

public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } TimeWindo indo = (TimeWindo) o; return end == indo.end && start == indo.start; }

SlidingEventTimeWindos

SlidingEventTimeWindos基于事件时间的滑动时间窗口分配器,它是一个WindoAssigner。Flink提供两个接口初始化SlidingEventTimeWindos

public static SlidingEventTimeWindos of(Time size, Time slide) { return ne SlidingEventTimeWindos(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingEventTimeWindos of(Time size, Time slide, Time offset) { return ne SlidingEventTimeWindos(size.toMilliseconds(), slide.toMilliseconds(),offset.toMilliseconds() % slide.toMilliseconds()); }

同样,不管使用哪种方式初始化SlidingEventTimeWindos,最终都会调用同一个构造方法初始化,构造方法初始化三个属性size、slide和offset。

protected SlidingEventTimeWindos(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { thro ne IllegalArgumentException(); } this.size = size; this.slide = slide; this.offset = offset; }

SlidingEventTimeWindos是一个WindoAssigner,所以它实现了assignWindos方法来为流入的数据分配窗口。

public Collection assignWindos(Object element, long timestamp, WindoAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List indos = ne ArrayList<>((int) (size / slide)); long lastStart = TimeWindo.getWindoStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size;start -= slide) { indos.add(ne TimeWindo(start, start + size)); } return indos; } else { thro ne RuntimeException(); } }

与基于处理时间的WindoAssigner不同,基于事件时间的WindoAssigner不依赖于系统时间,而是依赖于数据本身的事件时间。在assignWindos方法中第二个参数timestamp就是数据的事件时间。

第一步assignWindos方法会先初始化一个List,大小是size / slide。这个集合用来存放时间窗对象并作为返回结果。

第二步执行TimeWindo.getWindoStartWithOffset(timestamp, offset, slide);计算窗口起始时间。

第三步根据事件时间、滑动大小和窗口大小计算并生成数据能落入的窗口ne TimeWindo(start, start + size),加入到List集合并返回。「因为是滑动窗口一个数据可能落在多个窗口」

比如,电脑维修网希望每5秒滑动一次处理最近10秒窗口数据keyedStream.timeWindo(Time.seconds(10), Time.seconds(5))。当数据源源不断流入Windo Operator时,会按10秒切割一个时间窗,5秒滚动一次。

我们假设一条付费事件数据付费时间是2019年1月1日 17:11:24,那么这个付费数据将落到下面两个窗口中(请注意,窗口是左闭右开)。

Windo[2019年1月1日 17:11:20, 2019年1月1日 17:11:30) Windo[2019年1月1日 17:11:15, 2019年1月1日 17:11:25)

Time Windo Trigger

Flink API在创建Time Windo时没有使用indoStream.trigger方法覆盖默认Trigger。

TumblingProcessingTimeWindos使用ProcessingTimeTrigger作为默认Trigger。ProcessingTimeTrigger在onElement的策略是永远返回CONTINUE,也就是说它不会因为数据的流入触发窗口计算和清理。在返回CONTINUE前调用registerProcessingTimeTimer(indo.maxTimestamp());注册一个定时器,并且逻辑相同窗口只注册一次,事件所在窗口的结束时间与系统当前时间差决定了定时器多久后触发。

ScheduledThreadPoolExecutor.schedule(ne TriggerTask(), timeEndTime - systemTime, TimeUnit.MILLISECONDS);

定时器一旦触发会回调Trigger的onProcessingTime方法。ProcessingTimeTrigger中实现的onProcessingTime直接返回FIRE。也就是说系统时间大于等于窗口最大时间时,通过回调方式触发窗口计算。但因为返回的是FIRE只是触发了窗口计算,并没有做清除。

SlidingEventTimeWindos使用EventTimeTrigger作为默认Trigger。事件时间、摄入时间与处理时间在时间概念上有一点不同,处理时间处理依赖的是系统时钟生成的时间,而事件时间和摄入时间依赖的是Watermark(水印)。我们现在只需要知道水印是一个时间戳,可以由Flink以固定的时间间隔发出,或由开发人员根据业务自定义。水印用来衡量处理程序的时间进展。

EventTimeTrigger的onElement方法中比较窗口的结束时间与当前水印时间,如果窗口结束时间已小于或等于当前水印时间立即返回FIRE。

「个人理解这是由于时间差问题导致的窗口时间小于或等于当前水印时间,正常情况下如果窗口结束时间已经小于水印时间则数据不会被处理,也不会调用onElement」

如果窗口结束时间大于当前水印时间,调用registerEventTimeTimer(indo.maxTimestamp())注册一个事件后直接返回CONTINUE。EventTime注册事件没有使用Scheduled,因为它依赖水印时间。所以在注册时将逻辑相同的时间窗封装为一个特定对象添加到一个排重队列,并且相同窗口对象只添加一次。

上面提到水印是以固定时间间隔发出或由开发人员自定义的,Flink处理水印时从排重队列头获取一个时间窗对象与水印时间戳比较,一旦窗口时间小于或等于水印时间回调trigger的onEventTime。

EventTimeTrigger中onEventTime并不是直接返回FIRE,而是判断窗口结束时间与获取的时间窗对象时间做比较,仅当时间相才返回FIRE,其他情况返回CONTINUE。「个人理解这么做是为了满足滑动窗口的需求,因为滑动窗口在排重队列中存在两个不同的对象,而两个窗口对象的时间可能满足回调条件」

Time Windo Evictor

Flink内置Time Windo实现没有使用Evictor。

Session Windo API

KeyedStream中没有为Session Windo提供类似Count Windon和Time Windo一样能直接使用的API。我们可以使用indo(WindoAssigner assigner)创建Session Windo。

比如创建一个基于处理时间,时间间隔为2秒的SessionWindo可以这样实现

keyedStream.indo(ProcessingTimeSessionWindos.ithGap(Time.seconds(2)))

Assigner

Flink内置的Session Windo Assigner全部继承MergingWindoAssigner。下图展示了MergingWindoAssigner的上下结构关系。

MergingWindoAssigner继承了WindoAssigner,所以它具备分配时间窗的能力。MergingWindoAssigner自身是一个可以merge的Windo,它的内部定义了一个mergeWindos抽象方法以及merge时的回调定义。

public abstract void mergeWindos(Collection indos, MergeCallback callback); public interface MergeCallback { void merge(Collection toBeMerged, W mergeResult); }

我们以ProcessingTimeSessionWindos为例介绍Session Windo。ProcessingTimeSessionWindos提供了一个静态方法用来初始化ProcessingTimeSessionWindos

public static ProcessingTimeSessionWindos ithGap(Time size) { return ne ProcessingTimeSessionWindos(size.toMilliseconds()); }

静态方法ithGap接收一个时间参数,用来描述时间间隔。并调用构造方法将时间间隔赋值给sessionTimeout属性。

protected ProcessingTimeSessionWindos(long sessionTimeout) { if (sessionTimeout <= 0) { thro ne IllegalArgumentException(); } this.sessionTimeout = sessionTimeout; }

ProcessingTimeSessionWindos是一个WindoAssigner,所以它实现了数据分配窗口的能力。

public Collection assignWindos(Object element, long timestamp, WindoAssignerContext context) { long currentProcessingTime = context.getCurrentProcessingTime(); return Collections.singletonList(ne TimeWindo(currentProcessingTime, currentProcessingTime + sessionTimeout)); }

ProcessingTimeSessionWindos会为每个数据都分配一个新的时间窗口。由于是基于处理时间,所以窗口的起始时间就是系统当前时间,而结束时间是系统当前时间+设置的时间间隔。通过起始时间和结束时间确定了窗口的时间范围。

Trigger

如果在代码中我们不手动覆盖Trigger,那么将使用ProcessingTimeSessionWindos默认的ProcessingTimeTrigger

public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }

ProcessingTimeTrigger在基于处理时间的Time Windo介绍过,它通过注册、onProcessorTime回调方式触发窗口计算,这里不再讨论。

Evictor

Session Windo不由Flink API控制生成,完全取决于客户端如何创建。在创建Windo实例后可以通过调用evictor方法并传入Flink内置的Evictor或自己实现的Evictor。

Merging

Session Windo继承MergingWindoAssigner,MergingWindoAssigner继承WindoAssigner。所以本质上Session Windo还是一个WindoAssigner,但因继承了MergingWindoAssigner使得自己具有了一个「可以合并时间窗口」的特性。

public void mergeWindos(Collection indos, MergeCallback c) { TimeWindo.mergeWindos(indos, c); }

Session Windo处理流程大致是这样

  1. 使用WindoAssigner为流入的数据分配窗口
  2. Merge窗口,将存在交集的窗口合并,取最小时间和最大时间作为窗口的起始和关闭。假设有两条数据流入系统后,通过WindoAssigner分配的窗口分别是
  3. 数据AWindo[2019年1月1日 10:00:00, 2019年1月1日 10:20:00)
  4. 数据BWindo[2019年1月1日 10:05:00, 2019年1月1日 10:25:00)
  5. 经过合并后,使用数据A的起始时间和数据B的结束时间作为节点,窗口时间变为了
  6. [2019年1月1日 10:00:00, 2019年1月1日 10:25:00)
  7. 执行Trigger.onMerge,为合并后的窗口注册回调事件
  8. 移除其他注册的回调事件
  9. Windo State合并
  10. 开始处理数据,执行Trigger.onElement
  11. …后续与其他Windo处理一样

可以看到,Session Windo与Time Windo类似,通过注册回调方式触发数据处理。但不同的是Session Windo通过不断为新流入的数据做Merge操作来改变回调时间点,以实现Session Windo的特性。

  • Windo Operator创建
  • Windo处理流程由WindoOperator或EvictingWindoOperator控制,他们的关系及区别体现在以下几点
  1. EvictingWindoOperator继承自WindoOperator,所以EvictingWindoOperator是一个WindoOperator,具备WindoOperator的特性。
  2. 清理窗口数据的机制不同,EvictingWindoOperator内部依赖Evictor组件,而WindoOperator内部不使用Evictor。这也导致它们两个Operator初始化时的差异

  • MergeWindo特殊处理
  • 可以合并窗口的WindoAssigner会继承MergingWindoAssigner。当数据流入Windo Operator后,根据WindoAssigner是否为一个MergingWindoAssigner决定了处理流程。

  • 窗口生命周期
  • Flink内置的窗口生命周期是不同的,下表描述了他们直接的差异

  • 侧路输出
  • 当Flink应用采用EventTime作为时间机制时,Windo不会处理延迟到达的数据,也就是说不处理在水印时间戳之前的数据。Flink提供了一个SideOutput机制可以处理这些延迟到达的数据。通过WindoedStream.sideOutputLateData方法实现侧路输出。
  • 自定义窗口
  • Flink内置窗口利用WindoAssigner、Trigger、Evictor3个组件的相互组合实现了多种非常强大的功能,我们也可以尝试通过组件实现一个自定义的Windo。由于篇幅原因,自定义窗口下篇再细聊。

END

作者TalkingData 史天舒

Copyright © 2016-2025 www.caominkang.com 曹敏电脑维修网 版权所有 Power by