Flink之Watermark詳解
在上一篇文章中我們介紹了窗口相關(guān)的內(nèi)容,那么問(wèn)題來(lái)了,比如公司組織春游,規(guī)定周六早晨8:00 ~ 8:30清查人數(shù),人齊則發(fā)車出發(fā),可是總有那么個(gè)同學(xué)會(huì)睡懶覺(jué)遲到,這時(shí)候通常也會(huì)等待20分鐘,但是不能一直等下去,如果到了20分鐘則認(rèn)為,想自己在家過(guò)周末,不參與春游活動(dòng)了,不會(huì)繼續(xù)等待了,直接出發(fā)。
這種機(jī)制跟這里要講的watermark機(jī)制是一個(gè)意思。指的是,由于網(wǎng)絡(luò)延遲等原因,一條數(shù)據(jù)會(huì)遲到計(jì)算,比如使用event time
來(lái)劃分窗口,我們知道窗口中的數(shù)據(jù)是計(jì)算一段時(shí)間的數(shù)據(jù),如果一個(gè)數(shù)據(jù)來(lái)晚了,它的時(shí)間范圍已經(jīng)不屬于這個(gè)窗口了,則會(huì)被丟棄,但他的event time
實(shí)際上是屬于這個(gè)窗口的。引入watermark機(jī)制則會(huì)等待晚到的數(shù)據(jù)一段時(shí)間,等待時(shí)間到則觸發(fā)計(jì)算,如果數(shù)據(jù)延遲很大,通常也會(huì)被丟棄或者另外處理。
1. 基本概念是什么
- Window:Window是處理無(wú)界流的關(guān)鍵,Windows將流拆分為一個(gè)個(gè)有限大小的buckets,可以可以在每一個(gè)buckets中進(jìn)行計(jì)算
- start_time,end_time:當(dāng)Window時(shí)時(shí)間窗口的時(shí)候,每個(gè)window都會(huì)有一個(gè)開(kāi)始時(shí)間和結(jié)束時(shí)間(前開(kāi)后閉),這個(gè)時(shí)間是系統(tǒng)時(shí)間
- event-time: 事件發(fā)生時(shí)間,是事件發(fā)生所在設(shè)備的當(dāng)?shù)貢r(shí)間,比如一個(gè)點(diǎn)擊事件的時(shí)間發(fā)生時(shí)間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時(shí)間
- Watermarks:可以把他理解為一個(gè)水位線,等于evevtTime - delay(比如規(guī)定為20分鐘),一旦Watermarks大于了某個(gè)window的end_time,就會(huì)觸發(fā)此window的計(jì)算,Watermarks就是用來(lái)觸發(fā)window計(jì)算的。
推遲窗口觸發(fā)的時(shí)間,實(shí)現(xiàn)方式:通過(guò)當(dāng)前窗口中最大的eventTime-延遲時(shí)間所得到的Watermark與窗口原始觸發(fā)時(shí)間進(jìn)行對(duì)比,當(dāng)Watermark大于窗口原始觸發(fā)時(shí)間時(shí)則觸發(fā)窗口執(zhí)行?。?!我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event
Time順序排列的。
那么此時(shí)出現(xiàn)一個(gè)問(wèn)題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無(wú)限期的等下去,此時(shí)必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。
Watermark是一種衡量Event Time進(jìn)展的機(jī)制。
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長(zhǎng)t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime
- t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被觸發(fā)執(zhí)行。
有序流的Watermarker如下圖所示:(Watermark設(shè)置為0)
亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)
當(dāng)Flink接收到數(shù)據(jù)時(shí),會(huì)按照一定的規(guī)則去生成Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime - 延遲時(shí)長(zhǎng),也就是說(shuō),Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過(guò)程中無(wú)法獲取新的數(shù)據(jù),那么沒(méi)有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
上圖中,我們?cè)O(shè)置的允許最大延遲到達(dá)時(shí)間為2s,所以時(shí)間戳為5s的事件對(duì)應(yīng)的Watermark是3s,時(shí)間戳為9s的事件的Watermark是7s,如果我們的窗口1是1s-3s,窗口2是4s-6s,那么時(shí)間戳為5s的事件到達(dá)時(shí)的Watermarker恰好觸發(fā)窗口1,時(shí)間戳為9s的事件到達(dá)時(shí)的Watermark觸發(fā)窗口2。
Watermark 就是觸發(fā)前一窗口的“關(guān)窗時(shí)間”,一旦觸發(fā)關(guān)門(mén)那么以當(dāng)前時(shí)刻為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會(huì)收入窗中。只要沒(méi)有達(dá)到水位那么不管現(xiàn)實(shí)中的時(shí)間推進(jìn)了多久都不會(huì)觸發(fā)關(guān)窗。
2. Watermark的引入
watermark的引入很簡(jiǎn)單,對(duì)于亂序數(shù)據(jù),最常見(jiàn)的引用方式如下:
Event Time的使用通常要指定數(shù)據(jù)源中的時(shí)間戳,否則程序無(wú)法知道事件的事件時(shí)間是什么(數(shù)據(jù)源里的數(shù)據(jù)沒(méi)有時(shí)間戳的話,就只能使用Processing Time了)。
我們看到上面的例子中創(chuàng)建了一個(gè)看起來(lái)有點(diǎn)復(fù)雜的類,這個(gè)匿名類實(shí)現(xiàn)的其實(shí)就是分配時(shí)間戳的接口。Flink暴露了TimestampAssigner接口供我們實(shí)現(xiàn),使我們可以自定義如何從事件數(shù)據(jù)中抽取時(shí)間戳,必須通過(guò)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
指定通過(guò)時(shí)間事件EventTime來(lái)分配數(shù)據(jù)。
MyAssigner有兩種類型
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
以上兩個(gè)接口都繼承自TimestampAssigner,區(qū)別是
定期水位線(Assigner with periodic watermarks)
上面講述了根據(jù)從事件數(shù)據(jù)中去獲取時(shí)間戳設(shè)置水位線,但存在的問(wèn)題是沒(méi)有達(dá)到水位線時(shí)不管現(xiàn)實(shí)中的時(shí)間推進(jìn)了多久都不會(huì)觸發(fā)關(guān)窗,所以接下來(lái)我們就來(lái)介紹下定期水位線(Periodic Watermark)按照固定時(shí)間間隔生成新的水位線,不管是否有新的消息抵達(dá),水位線提升的時(shí)間間隔是由用戶設(shè)置的,在兩次水位線提升時(shí)隔內(nèi)會(huì)有一部分消息流入,用戶可以根據(jù)這部分?jǐn)?shù)據(jù)來(lái)計(jì)算出新的水位線。舉個(gè)例子,最簡(jiǎn)單的水位線算法就是取目前為止最大的事件時(shí)間,然而這種方式比較暴力,對(duì)亂序事件的容忍程度比較低,容易出現(xiàn)大量遲到事件。
應(yīng)用定期水位線需要實(shí)現(xiàn)AssignerWithPeriodicWatermarks API,以下是 Flink 1.9 官網(wǎng)提供的定期水位線的實(shí)現(xiàn)例子。
其中extractTimestamp用于從消息中提取事件時(shí)間,而getCurrentWatermark用于生成新的水位線,新的水位線只有大于當(dāng)前水位線才是有效的。每個(gè)窗口都會(huì)有該類的一個(gè)實(shí)例,因此可以利用實(shí)例的成員變量保存狀態(tài),比如上例中的當(dāng)前最大時(shí)間戳
注:周期性的(一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性(默認(rèn)200ms)產(chǎn)生Watermark,否則在極端情況下會(huì)有很大的延時(shí)。
深入到assignTimestampsAndWatermarks
里面,TimestampsAndPeriodicWatermarksOperator
有一個(gè)定時(shí)回調(diào)任務(wù):
里面大家感興趣可以繼續(xù)看一下,定時(shí)回調(diào)的方法,將符合要求的watermark發(fā)送出去并且注冊(cè)下一個(gè)定時(shí)器。
標(biāo)點(diǎn)水位線(Assigner with punctuated watermarks)
標(biāo)點(diǎn)水位線(Punctuated Watermark)通過(guò)數(shù)據(jù)流中某些特殊標(biāo)記事件來(lái)觸發(fā)新水位線的生成。這種方式下窗口的觸發(fā)與時(shí)間無(wú)關(guān),而是決定于何時(shí)收到標(biāo)記事件。
應(yīng)用標(biāo)點(diǎn)水位線需要實(shí)現(xiàn)AssignerWithPunctuatedWatermarks API,以下是 Flink 1.9 官網(wǎng)提供的標(biāo)點(diǎn)水位線的實(shí)現(xiàn)例子。
其中extractTimestamp用于從消息中提取事件時(shí)間,checkAndGetNextWatermark用于檢查事件是否標(biāo)點(diǎn)事件,若是則生成新的水位線。不同于定期水位線定時(shí)調(diào)用getCurrentWatermark,標(biāo)點(diǎn)水位線是每接受一個(gè)事件就需要調(diào)用checkAndGetNextWatermark,若返回值非 null 且新水位線大于當(dāng)前水位線,則觸發(fā)窗口計(jì)算
注:數(shù)據(jù)流中每一個(gè)遞增的EventTime都會(huì)產(chǎn)生一個(gè)Watermark。在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成。
遲到事件
雖說(shuō)水位線表明著早于它的事件不應(yīng)該再出現(xiàn),但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì),導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉。
遲到事件出現(xiàn)時(shí)窗口已經(jīng)關(guān)閉并產(chǎn)出了計(jì)算結(jié)果,因此處理的方法有3種:
- 重新激活已經(jīng)關(guān)閉的窗口并重新計(jì)算以修正結(jié)果。
- 將遲到事件收集起來(lái)另外處理。
- 將遲到事件視為錯(cuò)誤消息并丟棄。
Flink 默認(rèn)的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。
Side Output機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品,以便用戶獲取并對(duì)其進(jìn)行特殊處理。
Allowed Lateness機(jī)制允許用戶設(shè)置一個(gè)允許的最大遲到時(shí)長(zhǎng)。Flink 會(huì)再窗口關(guān)閉后一直保存窗口的狀態(tài)直至超過(guò)允許遲到時(shí)長(zhǎng),這期間的遲到事件不會(huì)被丟棄,而是默認(rèn)會(huì)觸發(fā)窗口重新計(jì)算。因?yàn)楸4娲翱跔顟B(tài)需要額外內(nèi)存,并且如果窗口計(jì)算使用了 ProcessWindowFunction API 還可能使得每個(gè)遲到事件觸發(fā)一次窗口的全量計(jì)算,代價(jià)比較大,所以允許遲到時(shí)長(zhǎng)不宜設(shè)得太長(zhǎng),遲到事件也不宜過(guò)多,否則應(yīng)該考慮降低水位線提高的速度或者調(diào)整算法。
每個(gè)Kafka分區(qū)的Timestamp
當(dāng)使用Apache
Kafka座位數(shù)據(jù)源時(shí),每個(gè)Kafka分區(qū)可能有一個(gè)簡(jiǎn)單的事件時(shí)間模式(遞增的timestamp或者有界的無(wú)序)。然而,當(dāng)消費(fèi)Kafka中的數(shù)據(jù)時(shí),多個(gè)分區(qū)通常是并發(fā)進(jìn)行的,將事件從分區(qū)中分離開(kāi)來(lái),并銷毀分區(qū)模式(這是Kafka
consumer客戶端固有的工作模式)。
在這種情況下,你可以使用Flink的
Kafka-partition-aware(譯作:Kafka分區(qū)識(shí)別或者Kafka分區(qū)敏感)水印生成,使用這個(gè)特性,水印會(huì)在Kafka消費(fèi)端的每個(gè)分區(qū)中生成,并且每個(gè)分區(qū)的水印會(huì)在stream
shuffle中進(jìn)行合并。
例如:如果每個(gè)Kafka分區(qū)中的事件timestamp是嚴(yán)格遞增的話,使用ascending timestamps watermark generator(遞增時(shí)間戳水印生成器)將會(huì)得到完美的整體水印。
下圖展示了如何使用per-kafka-partition水印生成,以及水印是如何在流式數(shù)據(jù)流中傳播的。
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號(hào):Java大數(shù)據(jù)與數(shù)據(jù)倉(cāng)庫(kù)