徹底搞清Flink中的Window機制

窗口

在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算。

Flink 認(rèn)為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現(xiàn)了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個橋梁。

  • 一個Window代表有限對象的集合。一個窗口有一個最大的時間戳,該時間戳意味著在其代表的某時間點——所有應(yīng)該進入這個窗口的元素都已經(jīng)到達(dá)
  • Window就是用來對一個無限的流設(shè)置一個有限的集合,在有界的數(shù)據(jù)集上進行操作的一種機制。window又可以分為基于時間(Time-based)的window以及基于數(shù)量(Count-based)的window。
  • Flink DataStream API提供了Time和Count的window,同時增加了基于Session的window。同時,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。

窗口的組成

窗口分配器

  • assignWindows將某個帶有時間戳timestamp的元素element分配給一個或多個窗口,并返回窗口集合
  • getDefaultTrigger 返回跟WindowAssigner關(guān)聯(lián)的默認(rèn)觸發(fā)器
  • getWindowSerializer返回WindowAssigner分配的窗口的序列化器
    • 窗口分配器定義如何將數(shù)據(jù)元分配給窗口。這是通過WindowAssigner 在window(...)(對于被Keys化流)或windowAll()(對于非被Keys化流)調(diào)用中指定您的選擇來完成的。
  • WindowAssigner負(fù)責(zé)將每個傳入數(shù)據(jù)元分配給一個或多個窗口。Flink帶有預(yù)定義的窗口分配器,用于最常見的用例
    即翻滾窗口, 滑動窗口,會話窗口和全局窗口。
  • 您還可以通過擴展WindowAssigner類來實現(xiàn)自定義窗口分配器。
  • 所有內(nèi)置窗口分配器(全局窗口除外)都根據(jù)時間為窗口分配數(shù)據(jù)元,這可以是處理時間或事件時間。

State

  • 狀態(tài),用來存儲窗口內(nèi)的元素,如果有 AggregateFunction,則存儲的是增量聚合的中間結(jié)果。

窗口函數(shù)

選擇合適的計算函數(shù),減少開發(fā)代碼量提高系統(tǒng)性能

增量聚合函數(shù)(窗口只維護狀態(tài))

  • ReduceFunction
  • AggregateFunction
  • FoldFunction

全量聚合函數(shù)(窗口維護窗口內(nèi)的數(shù)據(jù))

  • ProcessWindowFunction
    • 全量計算
    • 支持功能更加靈活
    • 支持狀態(tài)操作

觸發(fā)器

image-20210202200655485

  • EventTimeTrigger基于事件時間的觸發(fā)器,對應(yīng)onEventTime
  • ProcessingTimeTrigger
    基于當(dāng)前系統(tǒng)時間的觸發(fā)器,對應(yīng)onProcessingTime
    ProcessingTime 有最好的性能和最低的延遲。但在分布式計算環(huán)境中ProcessingTime具有不確定性,相同數(shù)據(jù)流多次運行有可能產(chǎn)生不同的計算結(jié)果。
  • ContinuousEventTimeTrigger
  • ContinuousProcessingTimeTrigger
  • CountTrigger
    • Trigger確定何時窗口函數(shù)準(zhǔn)備好處理窗口(由窗口分配器形成)。每個都有默認(rèn)值。
      如果默認(rèn)觸發(fā)器不符合您的需要,您可以使用指定自定義觸發(fā)器。WindowAssignerTriggertrigger(...)
    • 觸發(fā)器界面有五種方法可以Trigger對不同的事件做出反應(yīng):
      • onElement()為添加到窗口的每個數(shù)據(jù)元調(diào)用該方法。
      • onEventTime()在注冊的事件時間計時器觸發(fā)時調(diào)用該方法。
      • onProcessingTime()在注冊的處理時間計時器觸發(fā)時調(diào)用該方法。
      • 該onMerge()方法與狀態(tài)觸發(fā)器相關(guān),并且當(dāng)它們的相應(yīng)窗口合并時合并兩個觸發(fā)器的狀態(tài),例如當(dāng)使用會話窗口時。
      • 最后,該clear()方法在移除相應(yīng)窗口時執(zhí)行所需的任何動作。
    • 默認(rèn)觸發(fā)器
      • 默認(rèn)觸發(fā)器GlobalWindow是NeverTrigger從不觸發(fā)的。因此,在使用時必須定義自定義觸發(fā)器GlobalWindow。
      • 通過使用trigger()您指定觸發(fā)器會覆蓋a的默認(rèn)觸發(fā)器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據(jù)時間進度獲取窗口,
        而是僅按計數(shù)。現(xiàn)在,如果你想根據(jù)時間和數(shù)量做出反應(yīng),你必須編寫自己的自定義觸發(fā)器。
      • event-time窗口分配器都有一個EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過窗口末尾時出發(fā)。

觸發(fā)器分類

CountTrigger

一旦窗口中的數(shù)據(jù)元數(shù)量超過給定限制,就會觸發(fā)。所以其觸發(fā)機制實現(xiàn)在onElement中

ProcessingTimeTrigger

基于處理時間的觸發(fā)。

EventTimeTrigger

根據(jù) watermarks 度量的事件時間進度進行觸發(fā)。

PurgingTrigger
  • 另一個觸發(fā)器作為參數(shù)作為參數(shù)并將其轉(zhuǎn)換為清除觸發(fā)器。
  • 其作用是在 Trigger 觸發(fā)窗口計算之后將窗口的 State 中的數(shù)據(jù)清除。
  • image-20210202200710573前兩條數(shù)據(jù)先后于20:01和20:02進入窗口,此時 State 中的值更新為3,同時到了Trigger的觸發(fā)時間,輸出結(jié)果為3。
    image-20210202200733128
  • 由于 PurgingTrigger 的作用,State 中的數(shù)據(jù)會被清除。

image-20210202200744793

DeltaTrigger
DeltaTrigger 的應(yīng)用
  • 有這樣一個車輛區(qū)間測試的需求,車輛每分鐘上報當(dāng)前位置與車速,每行進10公里,計算區(qū)間內(nèi)最高車速。






image-20210202200802480

觸發(fā)器原型

  • onElement
  • onProcessingTime
  • onEventTime
  • onMerge
  • clear

說明

  • TriggerResult可以是以下之一
    • CONTINUE 什么都不做
    • FIRE_AND_PURGE 觸發(fā)計算,然后清除窗口中的元素
    • FIRE 觸發(fā)計算 默認(rèn)情況下,內(nèi)置的觸發(fā)器只返回 FIRE,不會清除窗口狀態(tài)。
    • PURGE 清除窗口中的元素
  • 所有的事件時間窗口分配器都有一個 EventTimeTrigger 作為默認(rèn)觸發(fā)器。一旦 watermark 到達(dá)窗口末尾,這個觸發(fā)器就會被觸發(fā)。
  • 全局窗口(GlobalWindow)的默認(rèn)觸發(fā)器是永不會被觸發(fā)的 NeverTrigger。因此,在使用全局窗口時,必須自定義一個觸發(fā)器。
  • 通過使用 trigger() 方法指定觸發(fā)器,將會覆蓋窗口分配器的默認(rèn)觸發(fā)器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger,
    那么不會再根據(jù)時間進度觸發(fā)窗口,而只能通過計數(shù)。目前為止,如果你希望基于時間以及計數(shù)進行觸發(fā),則必須編寫自己的自定義觸發(fā)器。

窗口的分類

  • 根據(jù)窗口是否調(diào)用keyBy算子key化,分為被Keys化Windows和非被Keys化Windows;

  • 根據(jù)窗口的驅(qū)動方式,分為時間驅(qū)動(Time Window)、數(shù)據(jù)驅(qū)動(Count Window);
  • 根據(jù)窗口的元素分配方式,分為滾動窗口(tumbling windows)、滑動窗口(sliding windows)、會話窗口(session windows)以及全局窗口(global windows)

被Keys化Windows

可以理解為按照原始數(shù)據(jù)流中的某個key進行分類,擁有同一個key值的數(shù)據(jù)流將為進入同一個window,多個窗口并行的邏輯流

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
Java

非被Keys化Windows

  • 不做分類,每進入一條數(shù)據(jù)即增加一個窗口,多個窗口并行,每個窗口處理1條數(shù)據(jù)
  • WindowAll 將元素按照某種特性聚集在一起,該函數(shù)不支持并行操作,默認(rèn)的并行度就是1,所以如果使用這個算子的話需要注意一下性能問題
    stream
         .windowAll(...)           <-  required: "assigner"
        [.trigger(...)]            <-  optional: "trigger" (else default trigger)
        [.evictor(...)]            <-  optional: "evictor" (else no evictor)
        [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
        [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
         .reduce/aggregate/fold/apply()      <-  required: "function"
        [.getSideOutput(...)]      <-  optional: "output tag"
    Text

區(qū)別

  • 對于被Key化的數(shù)據(jù)流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細(xì)信息)。
  • 擁有被Key化的數(shù)據(jù)流將允許您的窗口計算由多個任務(wù)并行執(zhí)行,因為每個邏輯被Key化的數(shù)據(jù)流可以獨立于其余任務(wù)進行處理。
    引用相同Keys的所有數(shù)據(jù)元將被發(fā)送到同一個并行任務(wù)。

Time-Based window(基于時間的窗口)

每一條記錄來了以后會根據(jù)時間屬性值采用不同的window assinger 方法分配給一個或者多個窗口,分為滾動窗口(Tumbling windows)和滑動窗口(Sliding windows)。

  • EventTime 數(shù)據(jù)本身攜帶的時間,默認(rèn)的時間屬性;
  • ProcessingTime 處理時間;
  • IngestionTime 數(shù)據(jù)進入flink程序的時間;

Tumbling windows(滾動窗口)

滾動窗口下窗口之間不重疊,且窗口長度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows創(chuàng)建一個基于Event Time或Processing Time的滾動時間窗口。

tumb-window

下面示例以滾動時間窗口(TumblingEventTimeWindows)為例,默認(rèn)模式是TimeCharacteristic.ProcessingTime處理時間

/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
Java

所以如果使用Event Time即數(shù)據(jù)的實際產(chǎn)生時間,需要通過senv.setStreamTimeCharacteristic指定

// 指定使用數(shù)據(jù)的實際時間
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream input = ...;

// tumbling event-time windows
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .();

// tumbling processing-time windows
input
    .keyBy()
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .();

// 這里減去8小時,表示用UTC世界時間
input
    .keyBy()
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .();
Java

Sliding windows(滑動窗口)

滑動窗口以一個步長(Slide)不斷向前滑動,窗口的長度固定。使用時,我們要設(shè)置Slide和Size。Slide的大小決定了Flink以多大的頻率來創(chuàng)建新的窗口,Slide較小,窗口的個數(shù)會很多。Slide小于窗口的Size時,相鄰窗口會重疊,一個事件會被分配到多個窗口;Slide大于Size,有些事件可能被丟掉。

slide-window

同理,如果是滑動時間窗口,也是類似的:

// 窗口的大小是10s,每5s滑動一次,也就是5s計算一次
.timeWindow(Time.seconds(10), Time.seconds(5))
Java

這里使用的是timeWindow,通常使用window,那么兩者的區(qū)別是什么呢?

timeWindow其實判斷時間的處理模式是ProcessingTime還是SlidingEventTimeWindows,幫我們判斷好了,調(diào)用方法直接傳入(Time size, Time slide)這兩個參數(shù)就好了,如果是使用.window方法,則需要自己來判斷,就是前者寫法更簡單一些。

public WindowedStream timeWindow(Time size, Time slide) {
    if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
        return window(SlidingProcessingTimeWindows.of(size, slide));
    } else {
        return window(SlidingEventTimeWindows.of(size, slide));
    }
}
Java

Count-Based window (基于計數(shù)的窗口)

Count Window 是根據(jù)元素個數(shù)對數(shù)據(jù)流進行分組的,也分滾動(tumb)和滑動(slide)。

Tumbling Count Window
當(dāng)我們想要每100個用戶購買行為事件統(tǒng)計購買總數(shù),那么每當(dāng)窗口中填滿100個元素了,就會對窗口進行計算,這種窗口我們稱之為翻滾計數(shù)窗口(Tumbling Count Window),上圖所示窗口大小為3個。通過使用 DataStream API,我們可以這樣實現(xiàn):

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)
Java

Sliding Count Window
當(dāng)然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和,代碼示例如下。

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)
Java

會話(session)窗口

  • SessionWindow中的Gap是一個非常重要的概念,它指的是session之間的間隔。
  • 如果session之間的間隔大于指定的間隔,數(shù)據(jù)將會被劃分到不同的session中。比如,設(shè)定5秒的間隔,0-5屬于一個session,5-10屬于另一個session

session-window

DataStream input = ...;

// event-time session windows with static gap
input
    .keyBy()
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .();

// event-time session windows with dynamic gap
input
    .keyBy()
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .();

// processing-time session windows with static gap
input
    .keyBy()
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .();

// processing-time session windows with dynamic gap
input
    .keyBy()
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .();
Java

Global Windows(全局窗口)

global-window

總結(jié)






SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows

  • 基于時間的滑動窗口
    • SlidingEventTimeWindows
    • SlidingProcessingTimeWindows
  • 基于時間的翻滾窗口
    • TumblingEventTimeWindows
    • TumblingProcessingTimeWindows
  • 基于計數(shù)的滑動窗口
    • countWindow(100, 10)
  • 基于計數(shù)的翻滾窗口
    • countWindow(100)
  • 會話窗口
    會話窗口:一條記錄一個窗口
    • ProcessingTimeSessionWindows
    • EventTimeSessionWindows
  • 全局窗口(GlobalWindows)
    • GlobalWindow是一個全局窗口,被實現(xiàn)為單例模式。其maxTimestamp被設(shè)置為Long.MAX_VALUE。
    • 該類內(nèi)部有一個靜態(tài)類定義了GlobalWindow的序列化器:Serializer。

延遲

默認(rèn)情況下,當(dāng)水印超過窗口末尾時,會刪除延遲數(shù)據(jù)元。
但是,F(xiàn)link允許為窗口 算子指定最大允許延遲。允許延遲指定數(shù)據(jù)元在被刪除之前可以延遲多少時間,并且其默認(rèn)值為0.
在水印通過窗口結(jié)束之后但在通過窗口結(jié)束加上允許的延遲之前到達(dá)的數(shù)據(jù)元,仍然添加到窗口中。
根據(jù)使用的觸發(fā)器,延遲但未丟棄的數(shù)據(jù)元可能會導(dǎo)致窗口再次觸發(fā)。就是這種情況EventTimeTrigger。

當(dāng)指定允許的延遲大于0時,在水印通過窗口結(jié)束后保持窗口及其內(nèi)容。在這些情況下,當(dāng)遲到但未掉落的數(shù)據(jù)元到達(dá)時,它可能觸發(fā)窗口的另一次觸發(fā)。
這些射擊被稱為late firings,因為它們是由遲到事件觸發(fā)的,與之相反的main firing 是窗口的第一次射擊。在會話窗口的情況下,后期點火可以進一步導(dǎo)致窗口的合并,因為它們可以“橋接”兩個預(yù)先存在的未合并窗口之間的間隙。
后期觸發(fā)發(fā)出的數(shù)據(jù)元應(yīng)該被視為先前計算的更新結(jié)果,即,您的數(shù)據(jù)流將包含同一計算的多個結(jié)果。根據(jù)您的應(yīng)用程序,您需要考慮這些重復(fù)的結(jié)果或?qū)ζ溥M行重復(fù)數(shù)據(jù)刪除。

窗口的使用

  • Flink為每個窗口創(chuàng)建一個每個數(shù)據(jù)元的副本。鑒于此,翻滾窗口保存每個數(shù)據(jù)元的一個副本(一個數(shù)據(jù)元恰好屬于一個窗口,除非它被延遲)
    動窗口會每個數(shù)據(jù)元創(chuàng)建幾個復(fù)本,如“ 窗口分配器”部分中所述。因此,尺寸為1天且滑動1秒的滑動窗口可能不是一個好主意。
  • ReduceFunction,AggregateFunction并且FoldFunction可以顯著降低存儲要求,因為它們急切地聚合數(shù)據(jù)元并且每個窗口只存儲一個值。
    相反,僅使用 ProcessWindowFunction需要累積所有數(shù)據(jù)元。

Evictor

  • 它剔除元素的時機是:在觸發(fā)器觸發(fā)之后,在窗口被處理(apply windowFunction)之前
  • Flink 的窗口模型允許在窗口分配器和觸發(fā)器之外指定一個可選的驅(qū)逐器(Evictor)??梢允褂?evictor(...) 方法來完成。
    驅(qū)逐器能夠在觸發(fā)器觸發(fā)之后,以及在應(yīng)用窗口函數(shù)之前或之后從窗口中移除元素
  • 默認(rèn)情況下,所有內(nèi)置的驅(qū)逐器在窗口函數(shù)之前使用
  • 指定驅(qū)逐器可以避免預(yù)聚合(pre-aggregation),因為窗口內(nèi)所有元素必須在應(yīng)用計算之前傳遞給驅(qū)逐器。
  • Flink不保證窗口內(nèi)元素的順序。這意味著雖然驅(qū)逐者可以從窗口的開頭移除元素,但這些元素不一定是先到的還是后到的。

內(nèi)置的Evitor

  • TimeEvitor
    • 以毫秒為單位的時間間隔作為參數(shù),對于給定的窗口,找到元素中的最大的時間戳max_ts,并刪除時間戳小于max_ts - interval的所有元素。
    • 本質(zhì)上是將罪行的元素選出來
  • CountEvitor
    • 保持窗口內(nèi)元素數(shù)量符合用戶指定數(shù)量,如果多于用戶指定的數(shù)量,從窗口緩沖區(qū)的開頭丟棄剩余的元素。
  • DeltaEvitor
    • 使用 DeltaFunction和 一個閾值,計算窗口緩沖區(qū)中的最后一個元素與其余每個元素之間的 delta 值,并刪除 delta 值大于或等于閾值的元素。
    • 通過定義的DeltaFunction 和 Threshold ,計算窗口中元素和最新元素的 Delta 值,將Delta 值超過 Threshold的元素刪除

watermark

  • watermark是一種衡量Event Time進展的機制,它是數(shù)據(jù)本身的一個隱藏屬性。
  • watermark Apache Flink為了處理EventTime 窗口計算提出的一種機制,本質(zhì)上也是一種時間戳,
    由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統(tǒng)Event,
    與普通數(shù)據(jù)流Event一樣流轉(zhuǎn)到對應(yīng)的下游算子,接收到Watermark Event的算子以此不斷調(diào)整自己管理的EventTime clock。
    算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數(shù)據(jù)元素到來了,所以Watermark可以看做是告訴Apache Flink框架數(shù)據(jù)流已經(jīng)處理到什么位置(時間維度)的方式。
  • 通?;贓vent Time的數(shù)據(jù),自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結(jié)合window來實現(xiàn)。
  • waterMark 的觸發(fā)時間機制(waterMark >= window_end_time)
    • 當(dāng)?shù)谝淮斡|發(fā)之后,以后所有到達(dá)的該窗口的數(shù)據(jù)(遲到數(shù)據(jù))都會觸發(fā)該窗口
    • 定義允許延遲,所以 waterMark=window_end_time+allowedLateness 是窗口被關(guān)閉,數(shù)據(jù)被丟棄
      • 對于out-of-order的數(shù)據(jù),F(xiàn)link可以通過watermark機制結(jié)合window的操作,來處理一定范圍內(nèi)的亂序數(shù)據(jù),(新進來的數(shù)據(jù))晚于前面進來的數(shù)據(jù),但是該數(shù)據(jù)所在窗口沒有被觸發(fā),
        這個時候數(shù)據(jù)還是有效的——EventTime
    • 對于out-of-order的數(shù)據(jù),延遲太多
    • 注意,如果不定義允許最大遲到時間,并且在有很多數(shù)據(jù)遲到的情況下,會嚴(yán)重影響正確結(jié)果,只要Event Time < watermark時間就會觸發(fā)窗口,也就是說遲到的每一條數(shù)據(jù)都會觸發(fā) 該窗口

產(chǎn)生方式

  • Punctuated
    • 數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark(其實是根據(jù)某個計算條件來做判斷)。
    • 在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
    • 每個事件都會攜帶事件,可以根據(jù)該時間產(chǎn)生一個watermark 或者可以根據(jù)事件攜帶的其他標(biāo)志——業(yè)務(wù)的結(jié)束標(biāo)志
  • Periodic - 周期性的(一定時間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個Watermark。
    在實際的生產(chǎn)中Periodic的方式必須結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會有很大的延時。

背景

  • 流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的
  • 但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。
  • 對于late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了
  • 它表示當(dāng)達(dá)到watermark到達(dá)之后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù)

解決的問題

  • Watermark的時間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時間戳不等于Event中的EventTime,
    Event中的EventTime自產(chǎn)生那一刻起就不可以改變了,不受Apache Flink框架控制,
    而Watermark的產(chǎn)生是在Apache Flink的Source節(jié)點或?qū)崿F(xiàn)的Watermark生成器計算產(chǎn)生(如上Apache Flink內(nèi)置的 Periodic Watermark實現(xiàn)),
    Apache Flink內(nèi)部對單流或多流的場景有統(tǒng)一的Watermark處理。
    • 默認(rèn)情況下小于watermark 時間戳的event 會被丟棄嗎

多流waterMark

  • 在實際的流計算中往往一個job中會處理多個Source的數(shù)據(jù),對Source的數(shù)據(jù)進行GroupBy分組,那么來自不同Source的相同key值會shuffle到同一個處理節(jié)點,
    并攜帶各自的Watermark,Apache Flink內(nèi)部要保證Watermark要保持單調(diào)遞增,多個Source的Watermark匯聚到一起時候可能不是單調(diào)自增的
  • Apache Flink內(nèi)部實現(xiàn)每一個邊上只能有一個遞增的Watermark, 當(dāng)出現(xiàn)多流攜帶Eventtime匯聚到一起(GroupBy or Union)時候,
    Apache Flink會選擇所有流入的Eventtime中最小的一個向下游流出。從而保證watermark的單調(diào)遞增和保證數(shù)據(jù)的完整性

理解

  • 默認(rèn)情況下watermark 已經(jīng)觸發(fā)過得窗口,即使有新數(shù)據(jù)(遲到)落進去不會被計算 ,遲到的意思
    watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即數(shù)據(jù)屬于這個窗口)
  • 允許遲到
    watermark>=window_n_end_time && watermark

窗口聚合

  • 增量聚合
    • 窗口內(nèi)來一條數(shù)據(jù)就計算一次
  • 全量聚合
    • 一次計算整個窗口里的所有元素(可以進行排序,一次一批可以針對外部鏈接)
    • 使用
      • 窗口之后調(diào)用 apply ,創(chuàng)建的元素里面方法的參數(shù)是一個迭代器

常用的一些方法

  • window
  • timeWindow和 countWind
  • process 和 apply

AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
簡而言之,前一個接口將會周期性發(fā)送Watermark,而第二個接口根據(jù)一些到達(dá)數(shù)據(jù)的屬性,例如一旦在流中碰到一個特殊的element便發(fā)送Watermark。

自定義窗口

  • Window Assigner:負(fù)責(zé)將元素分配到不同的window。
  • Trigger即觸發(fā)器,定義何時或什么情況下Fire一個window。
    • 對于CountWindow,我們可以直接使用已經(jīng)定義好的Trigger:CountTrigger trigger(CountTrigger.of(2))
  • Evictor(可選) 驅(qū)逐者,即保留上一window留下的某些元素。
  • 最簡單的情況,如果業(yè)務(wù)不是特別復(fù)雜,僅僅是基于Time和Count,我們其實可以用系統(tǒng)定義好的WindowAssigner以及Trigger和Evictor來實現(xiàn)不同的組合:

window 出現(xiàn)數(shù)據(jù)傾斜

  • window 產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過兩種方式來解決:
  • 在數(shù)據(jù)進入窗口前做預(yù)聚合;
  • 重新設(shè)計窗口聚合的 key;






作者:柯廣的網(wǎng)絡(luò)日志

微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫