Flink之Watermark詳解


在上一篇文章中我們介紹了窗口相關(guān)的內(nèi)容,那么問題來了,比如公司組織春游,規(guī)定周六早晨8:00 ~ 8:30清查人數(shù),人齊則發(fā)車出發(fā),可是總有那么個同學(xué)會睡懶覺遲到,這時候通常也會等待20分鐘,但是不能一直等下去,如果到了20分鐘則認為,想自己在家過周末,不參與春游活動了,不會繼續(xù)等待了,直接出發(fā)。

這種機制跟這里要講的watermark機制是一個意思。指的是,由于網(wǎng)絡(luò)延遲等原因,一條數(shù)據(jù)會遲到計算,比如使用event time來劃分窗口,我們知道窗口中的數(shù)據(jù)是計算一段時間的數(shù)據(jù),如果一個數(shù)據(jù)來晚了,它的時間范圍已經(jīng)不屬于這個窗口了,則會被丟棄,但他的event time實際上是屬于這個窗口的。引入watermark機制則會等待晚到的數(shù)據(jù)一段時間,等待時間到則觸發(fā)計算,如果數(shù)據(jù)延遲很大,通常也會被丟棄或者另外處理。

1. 基本概念是什么

  • Window:Window是處理無界流的關(guān)鍵,Windows將流拆分為一個個有限大小的buckets,可以可以在每一個buckets中進行計算
  • start_time,end_time:當Window時時間窗口的時候,每個window都會有一個開始時間和結(jié)束時間(前開后閉),這個時間是系統(tǒng)時間
  • event-time: 事件發(fā)生時間,是事件發(fā)生所在設(shè)備的當?shù)貢r間,比如一個點擊事件的時間發(fā)生時間,是用戶點擊操作所在的手機或電腦的時間
  • Watermarks:可以把他理解為一個水位線,等于evevtTime - delay(比如規(guī)定為20分鐘),一旦Watermarks大于了某個window的end_time,就會觸發(fā)此window的計算,Watermarks就是用來觸發(fā)window計算的。

推遲窗口觸發(fā)的時間,實現(xiàn)方式:通過當前窗口中最大的eventTime-延遲時間所得到的Watermark與窗口原始觸發(fā)時間進行對比,當Watermark大于窗口原始觸發(fā)時間時則觸發(fā)窗口執(zhí)行!?。∥覀冎?,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的。

那么此時出現(xiàn)一個問題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了,這個特別的機制,就是Watermark。

Watermark是一種衡量Event Time進展的機制。
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結(jié)合window來實現(xiàn)。
數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
Watermark可以理解成一個延遲觸發(fā)機制,我們可以設(shè)置Watermark的延時時長t,每次系統(tǒng)會校驗已經(jīng)到達的數(shù)據(jù)中最大的maxEventTime,然后認定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達,如果有窗口的停止時間等于maxEventTime – t,那么這個窗口被觸發(fā)執(zhí)行。
有序流的Watermarker如下圖所示:(Watermark設(shè)置為0)

亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)

當Flink接收到數(shù)據(jù)時,會按照一定的規(guī)則去生成Watermark,這條Watermark就等于當前所有到達數(shù)據(jù)中的maxEventTime - 延遲時長,也就是說,Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當前未觸發(fā)的窗口的停止時間要晚,那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠都不被觸發(fā)。

上圖中,我們設(shè)置的允許最大延遲到達時間為2s,所以時間戳為5s的事件對應(yīng)的Watermark是3s,時間戳為9s的事件的Watermark是7s,如果我們的窗口1是1s-3s,窗口2是4s-6s,那么時間戳為5s的事件到達時的Watermarker恰好觸發(fā)窗口1,時間戳為9s的事件到達時的Watermark觸發(fā)窗口2。

Watermark 就是觸發(fā)前一窗口的“關(guān)窗時間”,一旦觸發(fā)關(guān)門那么以當前時刻為準在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會收入窗中。只要沒有達到水位那么不管現(xiàn)實中的時間推進了多久都不會觸發(fā)關(guān)窗。

2. Watermark的引入

watermark的引入很簡單,對于亂序數(shù)據(jù),最常見的引用方式如下:

datastream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxOutOfOrderness)) {
    @Override
    public long extractTimestamp(String element) {
        return Long.valueOf(JSON.parseObject(element).getString("time"));
    }
})
Java

Event Time的使用通常要指定數(shù)據(jù)源中的時間戳,否則程序無法知道事件的事件時間是什么(數(shù)據(jù)源里的數(shù)據(jù)沒有時間戳的話,就只能使用Processing Time了)。
我們看到上面的例子中創(chuàng)建了一個看起來有點復(fù)雜的類,這個匿名類實現(xiàn)的其實就是分配時間戳的接口。Flink暴露了TimestampAssigner接口供我們實現(xiàn),使我們可以自定義如何從事件數(shù)據(jù)中抽取時間戳,必須通過env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)指定通過時間事件EventTime來分配數(shù)據(jù)。

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 必須指定
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Java

MyAssigner有兩種類型

AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks

以上兩個接口都繼承自TimestampAssigner,區(qū)別是

定期水位線(Assigner with periodic watermarks)

上面講述了根據(jù)從事件數(shù)據(jù)中去獲取時間戳設(shè)置水位線,但存在的問題是沒有達到水位線時不管現(xiàn)實中的時間推進了多久都不會觸發(fā)關(guān)窗,所以接下來我們就來介紹下定期水位線(Periodic Watermark)按照固定時間間隔生成新的水位線,不管是否有新的消息抵達,水位線提升的時間間隔是由用戶設(shè)置的,在兩次水位線提升時隔內(nèi)會有一部分消息流入,用戶可以根據(jù)這部分數(shù)據(jù)來計算出新的水位線。舉個例子,最簡單的水位線算法就是取目前為止最大的事件時間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現(xiàn)大量遲到事件。

應(yīng)用定期水位線需要實現(xiàn)AssignerWithPeriodicWatermarks API,以下是 Flink 1.9 官網(wǎng)提供的定期水位線的實現(xiàn)例子。

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        // 當前最大時間戳減去maxOutOfOrderness,就是watermark
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
Java

其中extractTimestamp用于從消息中提取事件時間,而getCurrentWatermark用于生成新的水位線,新的水位線只有大于當前水位線才是有效的。每個窗口都會有該類的一個實例,因此可以利用實例的成員變量保存狀態(tài),比如上例中的當前最大時間戳






注:周期性的(一定時間間隔或者達到一定的記錄條數(shù))產(chǎn)生一個Watermark。在實際的生產(chǎn)中Periodic的方式必須結(jié)合時間和積累條數(shù)兩個維度繼續(xù)周期性(默認200ms)產(chǎn)生Watermark,否則在極端情況下會有很大的延時。

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        // 如果是ProcessingTime,那么默認時間間隔是0,一直不會過濾時間。
        getConfig().setAutoWatermarkInterval(0);
    } else {
        // 如果是EventTime,則autoWatermarkInterval設(shè)置為200ms
        getConfig().setAutoWatermarkInterval(200);
    }
}
Java

深入到assignTimestampsAndWatermarks里面,TimestampsAndPeriodicWatermarksOperator有一個定時回調(diào)任務(wù):

@Override
public void open() throws Exception {
    super.open();

    currentWatermark = Long.MIN_VALUE;
    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

    if (watermarkInterval > 0) {
        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
}
Java

里面大家感興趣可以繼續(xù)看一下,定時回調(diào)的方法,將符合要求的watermark發(fā)送出去并且注冊下一個定時器。

標點水位線(Assigner with punctuated watermarks)

標點水位線(Punctuated Watermark)通過數(shù)據(jù)流中某些特殊標記事件來觸發(fā)新水位線的生成。這種方式下窗口的觸發(fā)與時間無關(guān),而是決定于何時收到標記事件。
應(yīng)用標點水位線需要實現(xiàn)AssignerWithPunctuatedWatermarks API,以下是 Flink 1.9 官網(wǎng)提供的標點水位線的實現(xiàn)例子。

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}
Java

其中extractTimestamp用于從消息中提取事件時間,checkAndGetNextWatermark用于檢查事件是否標點事件,若是則生成新的水位線。不同于定期水位線定時調(diào)用getCurrentWatermark,標點水位線是每接受一個事件就需要調(diào)用checkAndGetNextWatermark,若返回值非 null 且新水位線大于當前水位線,則觸發(fā)窗口計算

注:數(shù)據(jù)流中每一個遞增的EventTime都會產(chǎn)生一個Watermark。在實際的生產(chǎn)中Punctuated方式在TPS很高的場景下會產(chǎn)生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。

遲到事件

雖說水位線表明著早于它的事件不應(yīng)該再出現(xiàn),但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計,導(dǎo)致窗口在它們到達之前已經(jīng)關(guān)閉。
遲到事件出現(xiàn)時窗口已經(jīng)關(guān)閉并產(chǎn)出了計算結(jié)果,因此處理的方法有3種:

  • 重新激活已經(jīng)關(guān)閉的窗口并重新計算以修正結(jié)果。
  • 將遲到事件收集起來另外處理。
  • 將遲到事件視為錯誤消息并丟棄。

Flink 默認的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。

Side Output機制可以將遲到事件單獨放入一個數(shù)據(jù)流分支,這會作為 window 計算結(jié)果的副產(chǎn)品,以便用戶獲取并對其進行特殊處理。

Allowed Lateness機制允許用戶設(shè)置一個允許的最大遲到時長。Flink 會再窗口關(guān)閉后一直保存窗口的狀態(tài)直至超過允許遲到時長,這期間的遲到事件不會被丟棄,而是默認會觸發(fā)窗口重新計算。因為保存窗口狀態(tài)需要額外內(nèi)存,并且如果窗口計算使用了 ProcessWindowFunction API 還可能使得每個遲到事件觸發(fā)一次窗口的全量計算,代價比較大,所以允許遲到時長不宜設(shè)得太長,遲到事件也不宜過多,否則應(yīng)該考慮降低水位線提高的速度或者調(diào)整算法。

// 側(cè)輸出
SingleOutputStreamOperator<Tuple2<String, Long>> lateOutputTag = new OutputTag<>("lateOutputTag");

DataStream<Tuple2<String, Long>> dataStream = senv.addSource(
        new FlinkKafkaConsumer010<>(
                config.get("kafka-topic"),
                new SimpleStringSchema(),
                kafkaProps
        ))
        //設(shè)置watermark
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxOutOfOrderness)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.valueOf(JSON.parseObject(element).getString("time"));
            }
        }).map(x -> {
            JSONObject message = JSON.parseObject(x);
            return Tuple2.of(message.getString("name"), 1L);
        })
        .returns(Types.TUPLE(Types.STRING, Types.LONG))
        .keyBy(value -> value.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        // 窗口會等待5s
        .allowedLateness(Time.milliseconds(5000))
        // 另外收集起來
        .sideOutputLateData(lateOutputTag)
        .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {

            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {

                long count = 0;
                for(Tuple2<String, Long> element : elements){
                    count = count + 1;
                }

                out.collect(Tuple2.of("pv-" + key, count));
            }
        });

// 獲取遲到數(shù)據(jù)并寫入對應(yīng)Sink
dataStream.getSideOutput(lateOutputTag).addSink(new RichSinkFunction<Tuple2<String, Long>>() {
...
});
Java

每個Kafka分區(qū)的Timestamp

當使用Apache Kafka座位數(shù)據(jù)源時,每個Kafka分區(qū)可能有一個簡單的事件時間模式(遞增的timestamp或者有界的無序)。然而,當消費Kafka中的數(shù)據(jù)時,多個分區(qū)通常是并發(fā)進行的,將事件從分區(qū)中分離開來,并銷毀分區(qū)模式(這是Kafka consumer客戶端固有的工作模式)。
在這種情況下,你可以使用Flink的 Kafka-partition-aware(譯作:Kafka分區(qū)識別或者Kafka分區(qū)敏感)水印生成,使用這個特性,水印會在Kafka消費端的每個分區(qū)中生成,并且每個分區(qū)的水印會在stream shuffle中進行合并。
例如:如果每個Kafka分區(qū)中的事件timestamp是嚴格遞增的話,使用ascending timestamps watermark generator(遞增時間戳水印生成器)將會得到完美的整體水印。
下圖展示了如何使用per-kafka-partition水印生成,以及水印是如何在流式數(shù)據(jù)流中傳播的。

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);


作者:柯廣的網(wǎng)絡(luò)日志

微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫