徹底搞清Flink中的Window機制
窗口
在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算。
Flink 認(rèn)為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現(xiàn)了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個橋梁。
- 一個Window代表有限對象的集合。一個窗口有一個最大的時間戳,該時間戳意味著在其代表的某時間點——所有應(yīng)該進入這個窗口的元素都已經(jīng)到達(dá)
- Window就是用來對一個無限的流設(shè)置一個有限的集合,在有界的數(shù)據(jù)集上進行操作的一種機制。window又可以分為基于時間(Time-based)的window以及基于數(shù)量(Count-based)的window。
- Flink DataStream API提供了Time和Count的window,同時增加了基于Session的window。同時,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用戶自定義window。
窗口的組成
窗口分配器
- assignWindows將某個帶有時間戳timestamp的元素element分配給一個或多個窗口,并返回窗口集合
- getDefaultTrigger 返回跟WindowAssigner關(guān)聯(lián)的默認(rèn)觸發(fā)器
-
getWindowSerializer返回WindowAssigner分配的窗口的序列化器
- 窗口分配器定義如何將數(shù)據(jù)元分配給窗口。這是通過WindowAssigner 在window(...)(對于被Keys化流)或windowAll()(對于非被Keys化流)調(diào)用中指定您的選擇來完成的。
- WindowAssigner負(fù)責(zé)將每個傳入數(shù)據(jù)元分配給一個或多個窗口。Flink帶有預(yù)定義的窗口分配器,用于最常見的用例
即翻滾窗口, 滑動窗口,會話窗口和全局窗口。 - 您還可以通過擴展WindowAssigner類來實現(xiàn)自定義窗口分配器。
- 所有內(nèi)置窗口分配器(全局窗口除外)都根據(jù)時間為窗口分配數(shù)據(jù)元,這可以是處理時間或事件時間。
State
- 狀態(tài),用來存儲窗口內(nèi)的元素,如果有 AggregateFunction,則存儲的是增量聚合的中間結(jié)果。
窗口函數(shù)
選擇合適的計算函數(shù),減少開發(fā)代碼量提高系統(tǒng)性能
增量聚合函數(shù)(窗口只維護狀態(tài))
- ReduceFunction
- AggregateFunction
- FoldFunction
全量聚合函數(shù)(窗口維護窗口內(nèi)的數(shù)據(jù))
- ProcessWindowFunction
- 全量計算
- 支持功能更加靈活
- 支持狀態(tài)操作
觸發(fā)器
- EventTimeTrigger基于事件時間的觸發(fā)器,對應(yīng)onEventTime
- ProcessingTimeTrigger
基于當(dāng)前系統(tǒng)時間的觸發(fā)器,對應(yīng)onProcessingTime
ProcessingTime 有最好的性能和最低的延遲。但在分布式計算環(huán)境中ProcessingTime具有不確定性,相同數(shù)據(jù)流多次運行有可能產(chǎn)生不同的計算結(jié)果。 - ContinuousEventTimeTrigger
- ContinuousProcessingTimeTrigger
-
CountTrigger
- Trigger確定何時窗口函數(shù)準(zhǔn)備好處理窗口(由窗口分配器形成)。每個都有默認(rèn)值。
如果默認(rèn)觸發(fā)器不符合您的需要,您可以使用指定自定義觸發(fā)器。WindowAssignerTriggertrigger(...) - 觸發(fā)器界面有五種方法可以Trigger對不同的事件做出反應(yīng):
- onElement()為添加到窗口的每個數(shù)據(jù)元調(diào)用該方法。
- onEventTime()在注冊的事件時間計時器觸發(fā)時調(diào)用該方法。
- onProcessingTime()在注冊的處理時間計時器觸發(fā)時調(diào)用該方法。
- 該onMerge()方法與狀態(tài)觸發(fā)器相關(guān),并且當(dāng)它們的相應(yīng)窗口合并時合并兩個觸發(fā)器的狀態(tài),例如當(dāng)使用會話窗口時。
- 最后,該clear()方法在移除相應(yīng)窗口時執(zhí)行所需的任何動作。
- 默認(rèn)觸發(fā)器
- 默認(rèn)觸發(fā)器GlobalWindow是NeverTrigger從不觸發(fā)的。因此,在使用時必須定義自定義觸發(fā)器GlobalWindow。
- 通過使用trigger()您指定觸發(fā)器會覆蓋a的默認(rèn)觸發(fā)器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據(jù)時間進度獲取窗口,
而是僅按計數(shù)。現(xiàn)在,如果你想根據(jù)時間和數(shù)量做出反應(yīng),你必須編寫自己的自定義觸發(fā)器。 - event-time窗口分配器都有一個EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過窗口末尾時出發(fā)。
- Trigger確定何時窗口函數(shù)準(zhǔn)備好處理窗口(由窗口分配器形成)。每個都有默認(rèn)值。
觸發(fā)器分類
CountTrigger
一旦窗口中的數(shù)據(jù)元數(shù)量超過給定限制,就會觸發(fā)。所以其觸發(fā)機制實現(xiàn)在onElement中
ProcessingTimeTrigger
基于處理時間的觸發(fā)。
EventTimeTrigger
根據(jù) watermarks 度量的事件時間進度進行觸發(fā)。
PurgingTrigger
- 另一個觸發(fā)器作為參數(shù)作為參數(shù)并將其轉(zhuǎn)換為清除觸發(fā)器。
- 其作用是在 Trigger 觸發(fā)窗口計算之后將窗口的 State 中的數(shù)據(jù)清除。
-
前兩條數(shù)據(jù)先后于20:01和20:02進入窗口,此時 State 中的值更新為3,同時到了Trigger的觸發(fā)時間,輸出結(jié)果為3。
- 由于 PurgingTrigger 的作用,State 中的數(shù)據(jù)會被清除。
DeltaTrigger
DeltaTrigger 的應(yīng)用
- 有這樣一個車輛區(qū)間測試的需求,車輛每分鐘上報當(dāng)前位置與車速,每行進10公里,計算區(qū)間內(nèi)最高車速。
觸發(fā)器原型
- onElement
- onProcessingTime
- onEventTime
- onMerge
- clear
說明
- TriggerResult可以是以下之一
- CONTINUE 什么都不做
- FIRE_AND_PURGE 觸發(fā)計算,然后清除窗口中的元素
- FIRE 觸發(fā)計算 默認(rèn)情況下,內(nèi)置的觸發(fā)器只返回 FIRE,不會清除窗口狀態(tài)。
- PURGE 清除窗口中的元素
- 所有的事件時間窗口分配器都有一個 EventTimeTrigger 作為默認(rèn)觸發(fā)器。一旦 watermark 到達(dá)窗口末尾,這個觸發(fā)器就會被觸發(fā)。
- 全局窗口(GlobalWindow)的默認(rèn)觸發(fā)器是永不會被觸發(fā)的 NeverTrigger。因此,在使用全局窗口時,必須自定義一個觸發(fā)器。
- 通過使用 trigger() 方法指定觸發(fā)器,將會覆蓋窗口分配器的默認(rèn)觸發(fā)器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger,
那么不會再根據(jù)時間進度觸發(fā)窗口,而只能通過計數(shù)。目前為止,如果你希望基于時間以及計數(shù)進行觸發(fā),則必須編寫自己的自定義觸發(fā)器。
窗口的分類
- 根據(jù)窗口是否調(diào)用keyBy算子key化,分為被Keys化Windows和非被Keys化Windows;
- 根據(jù)窗口的驅(qū)動方式,分為時間驅(qū)動(Time Window)、數(shù)據(jù)驅(qū)動(Count Window);
- 根據(jù)窗口的元素分配方式,分為滾動窗口(tumbling windows)、滑動窗口(sliding windows)、會話窗口(session windows)以及全局窗口(global windows)
被Keys化Windows
可以理解為按照原始數(shù)據(jù)流中的某個key進行分類,擁有同一個key值的數(shù)據(jù)流將為進入同一個window,多個窗口并行的邏輯流
非被Keys化Windows
- 不做分類,每進入一條數(shù)據(jù)即增加一個窗口,多個窗口并行,每個窗口處理1條數(shù)據(jù)
- WindowAll 將元素按照某種特性聚集在一起,該函數(shù)不支持并行操作,默認(rèn)的并行度就是1,所以如果使用這個算子的話需要注意一下性能問題
區(qū)別
- 對于被Key化的數(shù)據(jù)流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細(xì)信息)。
- 擁有被Key化的數(shù)據(jù)流將允許您的窗口計算由多個任務(wù)并行執(zhí)行,因為每個邏輯被Key化的數(shù)據(jù)流可以獨立于其余任務(wù)進行處理。
引用相同Keys的所有數(shù)據(jù)元將被發(fā)送到同一個并行任務(wù)。
Time-Based window(基于時間的窗口)
每一條記錄來了以后會根據(jù)時間屬性值采用不同的window assinger 方法分配給一個或者多個窗口,分為滾動窗口(Tumbling windows)和滑動窗口(Sliding windows)。
- EventTime 數(shù)據(jù)本身攜帶的時間,默認(rèn)的時間屬性;
- ProcessingTime 處理時間;
- IngestionTime 數(shù)據(jù)進入flink程序的時間;
Tumbling windows(滾動窗口)
滾動窗口下窗口之間不重疊,且窗口長度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows創(chuàng)建一個基于Event Time或Processing Time的滾動時間窗口。
下面示例以滾動時間窗口(TumblingEventTimeWindows
)為例,默認(rèn)模式是TimeCharacteristic.ProcessingTime
處理時間
所以如果使用Event Time
即數(shù)據(jù)的實際產(chǎn)生時間,需要通過senv.setStreamTimeCharacteristic
指定
Sliding windows(滑動窗口)
滑動窗口以一個步長(Slide)不斷向前滑動,窗口的長度固定。使用時,我們要設(shè)置Slide和Size。Slide的大小決定了Flink以多大的頻率來創(chuàng)建新的窗口,Slide較小,窗口的個數(shù)會很多。Slide小于窗口的Size時,相鄰窗口會重疊,一個事件會被分配到多個窗口;Slide大于Size,有些事件可能被丟掉。
同理,如果是滑動時間窗口,也是類似的:
這里使用的是timeWindow
,通常使用window
,那么兩者的區(qū)別是什么呢?
timeWindow
其實判斷時間的處理模式是ProcessingTime
還是SlidingEventTimeWindows
,幫我們判斷好了,調(diào)用方法直接傳入(Time size, Time slide)
這兩個參數(shù)就好了,如果是使用.window
方法,則需要自己來判斷,就是前者寫法更簡單一些。
Count-Based window (基于計數(shù)的窗口)
Count Window 是根據(jù)元素個數(shù)對數(shù)據(jù)流進行分組的,也分滾動(tumb)和滑動(slide)。
Tumbling Count Window
當(dāng)我們想要每100個用戶購買行為事件統(tǒng)計購買總數(shù),那么每當(dāng)窗口中填滿100個元素了,就會對窗口進行計算,這種窗口我們稱之為翻滾計數(shù)窗口(Tumbling
Count Window),上圖所示窗口大小為3個。通過使用 DataStream API,我們可以這樣實現(xiàn):
Sliding Count Window
當(dāng)然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和,代碼示例如下。
會話(session)窗口
- SessionWindow中的Gap是一個非常重要的概念,它指的是session之間的間隔。
- 如果session之間的間隔大于指定的間隔,數(shù)據(jù)將會被劃分到不同的session中。比如,設(shè)定5秒的間隔,0-5屬于一個session,5-10屬于另一個session
Global Windows(全局窗口)
總結(jié)
SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows
- 基于時間的滑動窗口
- SlidingEventTimeWindows
- SlidingProcessingTimeWindows
- 基于時間的翻滾窗口
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
- 基于計數(shù)的滑動窗口
- countWindow(100, 10)
- 基于計數(shù)的翻滾窗口
- countWindow(100)
- 會話窗口
會話窗口:一條記錄一個窗口- ProcessingTimeSessionWindows
- EventTimeSessionWindows
- 全局窗口(GlobalWindows)
- GlobalWindow是一個全局窗口,被實現(xiàn)為單例模式。其maxTimestamp被設(shè)置為Long.MAX_VALUE。
- 該類內(nèi)部有一個靜態(tài)類定義了GlobalWindow的序列化器:Serializer。
延遲
默認(rèn)情況下,當(dāng)水印超過窗口末尾時,會刪除延遲數(shù)據(jù)元。
但是,F(xiàn)link允許為窗口 算子指定最大允許延遲。允許延遲指定數(shù)據(jù)元在被刪除之前可以延遲多少時間,并且其默認(rèn)值為0.
在水印通過窗口結(jié)束之后但在通過窗口結(jié)束加上允許的延遲之前到達(dá)的數(shù)據(jù)元,仍然添加到窗口中。
根據(jù)使用的觸發(fā)器,延遲但未丟棄的數(shù)據(jù)元可能會導(dǎo)致窗口再次觸發(fā)。就是這種情況EventTimeTrigger。
當(dāng)指定允許的延遲大于0時,在水印通過窗口結(jié)束后保持窗口及其內(nèi)容。在這些情況下,當(dāng)遲到但未掉落的數(shù)據(jù)元到達(dá)時,它可能觸發(fā)窗口的另一次觸發(fā)。
這些射擊被稱為late firings,因為它們是由遲到事件觸發(fā)的,與之相反的main firing 是窗口的第一次射擊。在會話窗口的情況下,后期點火可以進一步導(dǎo)致窗口的合并,因為它們可以“橋接”兩個預(yù)先存在的未合并窗口之間的間隙。
后期觸發(fā)發(fā)出的數(shù)據(jù)元應(yīng)該被視為先前計算的更新結(jié)果,即,您的數(shù)據(jù)流將包含同一計算的多個結(jié)果。根據(jù)您的應(yīng)用程序,您需要考慮這些重復(fù)的結(jié)果或?qū)ζ溥M行重復(fù)數(shù)據(jù)刪除。
窗口的使用
- Flink為每個窗口創(chuàng)建一個每個數(shù)據(jù)元的副本。鑒于此,翻滾窗口保存每個數(shù)據(jù)元的一個副本(一個數(shù)據(jù)元恰好屬于一個窗口,除非它被延遲)
動窗口會每個數(shù)據(jù)元創(chuàng)建幾個復(fù)本,如“ 窗口分配器”部分中所述。因此,尺寸為1天且滑動1秒的滑動窗口可能不是一個好主意。 - ReduceFunction,AggregateFunction并且FoldFunction可以顯著降低存儲要求,因為它們急切地聚合數(shù)據(jù)元并且每個窗口只存儲一個值。
相反,僅使用 ProcessWindowFunction需要累積所有數(shù)據(jù)元。
Evictor
- 它剔除元素的時機是:在觸發(fā)器觸發(fā)之后,在窗口被處理(apply windowFunction)之前
- Flink 的窗口模型允許在窗口分配器和觸發(fā)器之外指定一個可選的驅(qū)逐器(Evictor)??梢允褂?evictor(...) 方法來完成。
驅(qū)逐器能夠在觸發(fā)器觸發(fā)之后,以及在應(yīng)用窗口函數(shù)之前或之后從窗口中移除元素 - 默認(rèn)情況下,所有內(nèi)置的驅(qū)逐器在窗口函數(shù)之前使用
- 指定驅(qū)逐器可以避免預(yù)聚合(pre-aggregation),因為窗口內(nèi)所有元素必須在應(yīng)用計算之前傳遞給驅(qū)逐器。
- Flink不保證窗口內(nèi)元素的順序。這意味著雖然驅(qū)逐者可以從窗口的開頭移除元素,但這些元素不一定是先到的還是后到的。
內(nèi)置的Evitor
- TimeEvitor
- 以毫秒為單位的時間間隔作為參數(shù),對于給定的窗口,找到元素中的最大的時間戳max_ts,并刪除時間戳小于max_ts - interval的所有元素。
- 本質(zhì)上是將罪行的元素選出來
- CountEvitor
- 保持窗口內(nèi)元素數(shù)量符合用戶指定數(shù)量,如果多于用戶指定的數(shù)量,從窗口緩沖區(qū)的開頭丟棄剩余的元素。
- DeltaEvitor
- 使用 DeltaFunction和 一個閾值,計算窗口緩沖區(qū)中的最后一個元素與其余每個元素之間的 delta 值,并刪除 delta 值大于或等于閾值的元素。
- 通過定義的DeltaFunction 和 Threshold ,計算窗口中元素和最新元素的 Delta 值,將Delta 值超過 Threshold的元素刪除
watermark
- watermark是一種衡量Event Time進展的機制,它是數(shù)據(jù)本身的一個隱藏屬性。
- watermark Apache Flink為了處理EventTime 窗口計算提出的一種機制,本質(zhì)上也是一種時間戳,
由Apache Flink Source或者自定義的Watermark生成器按照需求Punctuated或者Periodic兩種方式生成的一種系統(tǒng)Event,
與普通數(shù)據(jù)流Event一樣流轉(zhuǎn)到對應(yīng)的下游算子,接收到Watermark Event的算子以此不斷調(diào)整自己管理的EventTime clock。
算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數(shù)據(jù)元素到來了,所以Watermark可以看做是告訴Apache Flink框架數(shù)據(jù)流已經(jīng)處理到什么位置(時間維度)的方式。 - 通?;贓vent Time的數(shù)據(jù),自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結(jié)合window來實現(xiàn)。
- waterMark 的觸發(fā)時間機制(waterMark >= window_end_time)
- 當(dāng)?shù)谝淮斡|發(fā)之后,以后所有到達(dá)的該窗口的數(shù)據(jù)(遲到數(shù)據(jù))都會觸發(fā)該窗口
- 定義允許延遲,所以 waterMark=window_end_time+allowedLateness 是窗口被關(guān)閉,數(shù)據(jù)被丟棄
- 對于out-of-order的數(shù)據(jù),F(xiàn)link可以通過watermark機制結(jié)合window的操作,來處理一定范圍內(nèi)的亂序數(shù)據(jù),(新進來的數(shù)據(jù))晚于前面進來的數(shù)據(jù),但是該數(shù)據(jù)所在窗口沒有被觸發(fā),
這個時候數(shù)據(jù)還是有效的——EventTime
- 對于out-of-order的數(shù)據(jù),F(xiàn)link可以通過watermark機制結(jié)合window的操作,來處理一定范圍內(nèi)的亂序數(shù)據(jù),(新進來的數(shù)據(jù))晚于前面進來的數(shù)據(jù),但是該數(shù)據(jù)所在窗口沒有被觸發(fā),
- 對于out-of-order的數(shù)據(jù),延遲太多
- 注意,如果不定義允許最大遲到時間,并且在有很多數(shù)據(jù)遲到的情況下,會嚴(yán)重影響正確結(jié)果,只要Event Time < watermark時間就會觸發(fā)窗口,也就是說遲到的每一條數(shù)據(jù)都會觸發(fā) 該窗口
產(chǎn)生方式
- Punctuated
- 數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark(其實是根據(jù)某個計算條件來做判斷)。
- 在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
- 每個事件都會攜帶事件,可以根據(jù)該時間產(chǎn)生一個watermark 或者可以根據(jù)事件攜帶的其他標(biāo)志——業(yè)務(wù)的結(jié)束標(biāo)志
-
Periodic - 周期性的(一定時間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個Watermark。
在實際的生產(chǎn)中Periodic的方式必須結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性產(chǎn)生Watermark,否則在極端情況下會有很大的延時。
背景
- 流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的
- 但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。
- 對于late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了
- 它表示當(dāng)達(dá)到watermark到達(dá)之后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù)
解決的問題
- Watermark的時間戳可以和Event中的EventTime 一致,也可以自己定義任何合理的邏輯使得Watermark的時間戳不等于Event中的EventTime,
Event中的EventTime自產(chǎn)生那一刻起就不可以改變了,不受Apache Flink框架控制,
而Watermark的產(chǎn)生是在Apache Flink的Source節(jié)點或?qū)崿F(xiàn)的Watermark生成器計算產(chǎn)生(如上Apache Flink內(nèi)置的 Periodic Watermark實現(xiàn)),
Apache Flink內(nèi)部對單流或多流的場景有統(tǒng)一的Watermark處理。- 默認(rèn)情況下小于watermark 時間戳的event 會被丟棄嗎
多流waterMark
- 在實際的流計算中往往一個job中會處理多個Source的數(shù)據(jù),對Source的數(shù)據(jù)進行GroupBy分組,那么來自不同Source的相同key值會shuffle到同一個處理節(jié)點,
并攜帶各自的Watermark,Apache Flink內(nèi)部要保證Watermark要保持單調(diào)遞增,多個Source的Watermark匯聚到一起時候可能不是單調(diào)自增的 - Apache Flink內(nèi)部實現(xiàn)每一個邊上只能有一個遞增的Watermark, 當(dāng)出現(xiàn)多流攜帶Eventtime匯聚到一起(GroupBy or Union)時候,
Apache Flink會選擇所有流入的Eventtime中最小的一個向下游流出。從而保證watermark的單調(diào)遞增和保證數(shù)據(jù)的完整性
理解
- 默認(rèn)情況下watermark 已經(jīng)觸發(fā)過得窗口,即使有新數(shù)據(jù)(遲到)落進去不會被計算 ,遲到的意思
watermark>=window_n_end_time && window_n_start_time<=vent_time<window_n_end_time(即數(shù)據(jù)屬于這個窗口)
- 允許遲到
watermark>=window_n_end_time && watermark
窗口聚合
- 增量聚合
- 窗口內(nèi)來一條數(shù)據(jù)就計算一次
- 全量聚合
- 一次計算整個窗口里的所有元素(可以進行排序,一次一批可以針對外部鏈接)
- 使用
- 窗口之后調(diào)用 apply ,創(chuàng)建的元素里面方法的參數(shù)是一個迭代器
常用的一些方法
- window
- timeWindow和 countWind
- process 和 apply
AssignerWithPeriodicWatermarks或接口AssignerWithPunctuatedWatermarks。
簡而言之,前一個接口將會周期性發(fā)送Watermark,而第二個接口根據(jù)一些到達(dá)數(shù)據(jù)的屬性,例如一旦在流中碰到一個特殊的element便發(fā)送Watermark。
自定義窗口
- Window Assigner:負(fù)責(zé)將元素分配到不同的window。
- Trigger即觸發(fā)器,定義何時或什么情況下Fire一個window。
- 對于CountWindow,我們可以直接使用已經(jīng)定義好的Trigger:CountTrigger trigger(CountTrigger.of(2))
- Evictor(可選) 驅(qū)逐者,即保留上一window留下的某些元素。
- 最簡單的情況,如果業(yè)務(wù)不是特別復(fù)雜,僅僅是基于Time和Count,我們其實可以用系統(tǒng)定義好的WindowAssigner以及Trigger和Evictor來實現(xiàn)不同的組合:
window 出現(xiàn)數(shù)據(jù)傾斜
- window 產(chǎn)生數(shù)據(jù)傾斜指的是數(shù)據(jù)在不同的窗口內(nèi)堆積的數(shù)據(jù)量相差過多。本質(zhì)上產(chǎn)生這種情況的原因是數(shù)據(jù)源頭發(fā)送的數(shù)據(jù)量速度不同導(dǎo)致的。出現(xiàn)這種情況一般通過兩種方式來解決:
- 在數(shù)據(jù)進入窗口前做預(yù)聚合;
- 重新設(shè)計窗口聚合的 key;
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫