Flink之Watermark詳解
在上一篇文章中我們介紹了窗口相關(guān)的內(nèi)容,那么問題來了,比如公司組織春游,規(guī)定周六早晨8:00 ~ 8:30清查人數(shù),人齊則發(fā)車出發(fā),可是總有那么個同學(xué)會睡懶覺遲到,這時候通常也會等待20分鐘,但是不能一直等下去,如果到了20分鐘則認為,想自己在家過周末,不參與春游活動了,不會繼續(xù)等待了,直接出發(fā)。
這種機制跟這里要講的watermark機制是一個意思。指的是,由于網(wǎng)絡(luò)延遲等原因,一條數(shù)據(jù)會遲到計算,比如使用event time
來劃分窗口,我們知道窗口中的數(shù)據(jù)是計算一段時間的數(shù)據(jù),如果一個數(shù)據(jù)來晚了,它的時間范圍已經(jīng)不屬于這個窗口了,則會被丟棄,但他的event time
實際上是屬于這個窗口的。引入watermark機制則會等待晚到的數(shù)據(jù)一段時間,等待時間到則觸發(fā)計算,如果數(shù)據(jù)延遲很大,通常也會被丟棄或者另外處理。
1. 基本概念是什么
- Window:Window是處理無界流的關(guān)鍵,Windows將流拆分為一個個有限大小的buckets,可以可以在每一個buckets中進行計算
- start_time,end_time:當Window時時間窗口的時候,每個window都會有一個開始時間和結(jié)束時間(前開后閉),這個時間是系統(tǒng)時間
- event-time: 事件發(fā)生時間,是事件發(fā)生所在設(shè)備的當?shù)貢r間,比如一個點擊事件的時間發(fā)生時間,是用戶點擊操作所在的手機或電腦的時間
- Watermarks:可以把他理解為一個水位線,等于evevtTime - delay(比如規(guī)定為20分鐘),一旦Watermarks大于了某個window的end_time,就會觸發(fā)此window的計算,Watermarks就是用來觸發(fā)window計算的。
推遲窗口觸發(fā)的時間,實現(xiàn)方式:通過當前窗口中最大的eventTime-延遲時間所得到的Watermark與窗口原始觸發(fā)時間進行對比,當Watermark大于窗口原始觸發(fā)時間時則觸發(fā)窗口執(zhí)行!?。∥覀冎?,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event
Time順序排列的。
那么此時出現(xiàn)一個問題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了,這個特別的機制,就是Watermark。
Watermark是一種衡量Event Time進展的機制。
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結(jié)合window來實現(xiàn)。
數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
Watermark可以理解成一個延遲觸發(fā)機制,我們可以設(shè)置Watermark的延時時長t,每次系統(tǒng)會校驗已經(jīng)到達的數(shù)據(jù)中最大的maxEventTime,然后認定eventTime小于maxEventTime
- t的所有數(shù)據(jù)都已經(jīng)到達,如果有窗口的停止時間等于maxEventTime – t,那么這個窗口被觸發(fā)執(zhí)行。
有序流的Watermarker如下圖所示:(Watermark設(shè)置為0)
亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)
當Flink接收到數(shù)據(jù)時,會按照一定的規(guī)則去生成Watermark,這條Watermark就等于當前所有到達數(shù)據(jù)中的maxEventTime - 延遲時長,也就是說,Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當前未觸發(fā)的窗口的停止時間要晚,那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠都不被觸發(fā)。
上圖中,我們設(shè)置的允許最大延遲到達時間為2s,所以時間戳為5s的事件對應(yīng)的Watermark是3s,時間戳為9s的事件的Watermark是7s,如果我們的窗口1是1s-3s,窗口2是4s-6s,那么時間戳為5s的事件到達時的Watermarker恰好觸發(fā)窗口1,時間戳為9s的事件到達時的Watermark觸發(fā)窗口2。
Watermark 就是觸發(fā)前一窗口的“關(guān)窗時間”,一旦觸發(fā)關(guān)門那么以當前時刻為準在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會收入窗中。只要沒有達到水位那么不管現(xiàn)實中的時間推進了多久都不會觸發(fā)關(guān)窗。
2. Watermark的引入
watermark的引入很簡單,對于亂序數(shù)據(jù),最常見的引用方式如下:
Event Time的使用通常要指定數(shù)據(jù)源中的時間戳,否則程序無法知道事件的事件時間是什么(數(shù)據(jù)源里的數(shù)據(jù)沒有時間戳的話,就只能使用Processing Time了)。
我們看到上面的例子中創(chuàng)建了一個看起來有點復(fù)雜的類,這個匿名類實現(xiàn)的其實就是分配時間戳的接口。Flink暴露了TimestampAssigner接口供我們實現(xiàn),使我們可以自定義如何從事件數(shù)據(jù)中抽取時間戳,必須通過env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
指定通過時間事件EventTime來分配數(shù)據(jù)。
MyAssigner有兩種類型
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
以上兩個接口都繼承自TimestampAssigner,區(qū)別是
定期水位線(Assigner with periodic watermarks)
上面講述了根據(jù)從事件數(shù)據(jù)中去獲取時間戳設(shè)置水位線,但存在的問題是沒有達到水位線時不管現(xiàn)實中的時間推進了多久都不會觸發(fā)關(guān)窗,所以接下來我們就來介紹下定期水位線(Periodic Watermark)按照固定時間間隔生成新的水位線,不管是否有新的消息抵達,水位線提升的時間間隔是由用戶設(shè)置的,在兩次水位線提升時隔內(nèi)會有一部分消息流入,用戶可以根據(jù)這部分數(shù)據(jù)來計算出新的水位線。舉個例子,最簡單的水位線算法就是取目前為止最大的事件時間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現(xiàn)大量遲到事件。
應(yīng)用定期水位線需要實現(xiàn)AssignerWithPeriodicWatermarks API,以下是 Flink 1.9 官網(wǎng)提供的定期水位線的實現(xiàn)例子。
其中extractTimestamp用于從消息中提取事件時間,而getCurrentWatermark用于生成新的水位線,新的水位線只有大于當前水位線才是有效的。每個窗口都會有該類的一個實例,因此可以利用實例的成員變量保存狀態(tài),比如上例中的當前最大時間戳
注:周期性的(一定時間間隔或者達到一定的記錄條數(shù))產(chǎn)生一個Watermark。在實際的生產(chǎn)中Periodic的方式必須結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性(默認200ms)產(chǎn)生Watermark,否則在極端情況下會有很大的延時。
深入到assignTimestampsAndWatermarks
里面,TimestampsAndPeriodicWatermarksOperator
有一個定時回調(diào)任務(wù):
里面大家感興趣可以繼續(xù)看一下,定時回調(diào)的方法,將符合要求的watermark發(fā)送出去并且注冊下一個定時器。
標點水位線(Assigner with punctuated watermarks)
標點水位線(Punctuated Watermark)通過數(shù)據(jù)流中某些特殊標記事件來觸發(fā)新水位線的生成。這種方式下窗口的觸發(fā)與時間無關(guān),而是決定于何時收到標記事件。
應(yīng)用標點水位線需要實現(xiàn)AssignerWithPunctuatedWatermarks API,以下是 Flink 1.9 官網(wǎng)提供的標點水位線的實現(xiàn)例子。
其中extractTimestamp用于從消息中提取事件時間,checkAndGetNextWatermark用于檢查事件是否標點事件,若是則生成新的水位線。不同于定期水位線定時調(diào)用getCurrentWatermark,標點水位線是每接受一個事件就需要調(diào)用checkAndGetNextWatermark,若返回值非 null 且新水位線大于當前水位線,則觸發(fā)窗口計算
注:數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark。在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。
遲到事件
雖說水位線表明著早于它的事件不應(yīng)該再出現(xiàn),但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計,導(dǎo)致窗口在它們到達之前已經(jīng)關(guān)閉。
遲到事件出現(xiàn)時窗口已經(jīng)關(guān)閉并產(chǎn)出了計算結(jié)果,因此處理的方法有3種:
- 重新激活已經(jīng)關(guān)閉的窗口并重新計算以修正結(jié)果。
- 將遲到事件收集起來另外處理。
- 將遲到事件視為錯誤消息并丟棄。
Flink 默認的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。
Side Output機制可以將遲到事件單獨放入一個數(shù)據(jù)流分支,這會作為 window 計算結(jié)果的副產(chǎn)品,以便用戶獲取并對其進行特殊處理。
Allowed Lateness機制允許用戶設(shè)置一個允許的最大遲到時長。Flink 會再窗口關(guān)閉后一直保存窗口的狀態(tài)直至超過允許遲到時長,這期間的遲到事件不會被丟棄,而是默認會觸發(fā)窗口重新計算。因為保存窗口狀態(tài)需要額外內(nèi)存,并且如果窗口計算使用了 ProcessWindowFunction API 還可能使得每個遲到事件觸發(fā)一次窗口的全量計算,代價比較大,所以允許遲到時長不宜設(shè)得太長,遲到事件也不宜過多,否則應(yīng)該考慮降低水位線提高的速度或者調(diào)整算法。
每個Kafka分區(qū)的Timestamp
當使用Apache
Kafka座位數(shù)據(jù)源時,每個Kafka分區(qū)可能有一個簡單的事件時間模式(遞增的timestamp或者有界的無序)。然而,當消費Kafka中的數(shù)據(jù)時,多個分區(qū)通常是并發(fā)進行的,將事件從分區(qū)中分離開來,并銷毀分區(qū)模式(這是Kafka
consumer客戶端固有的工作模式)。
在這種情況下,你可以使用Flink的
Kafka-partition-aware(譯作:Kafka分區(qū)識別或者Kafka分區(qū)敏感)水印生成,使用這個特性,水印會在Kafka消費端的每個分區(qū)中生成,并且每個分區(qū)的水印會在stream
shuffle中進行合并。
例如:如果每個Kafka分區(qū)中的事件timestamp是嚴格遞增的話,使用ascending timestamps watermark generator(遞增時間戳水印生成器)將會得到完美的整體水印。
下圖展示了如何使用per-kafka-partition水印生成,以及水印是如何在流式數(shù)據(jù)流中傳播的。
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫