Flink實時計算topN熱榜
TopN的常見應(yīng)用場景,最熱商品購買量,最高人氣作者的閱讀量等等。
1. 用到的知識點
- Flink創(chuàng)建kafka數(shù)據(jù)源;
- 基于 EventTime 處理,如何指定 Watermark;
- Flink中的Window,滾動(tumbling)窗口與滑動(sliding)窗口;
- State狀態(tài)的使用;
- ProcessFunction 實現(xiàn) TopN 功能;
2. 案例介紹
通過用戶訪問日志,計算最近一段時間平臺最活躍的幾位用戶topN。
- 創(chuàng)建kafka生產(chǎn)者,發(fā)送測試數(shù)據(jù)到kafka;
- 消費kafka數(shù)據(jù),使用滑動(sliding)窗口,每隔一段時間更新一次排名;
3. 數(shù)據(jù)源
這里使用kafka api發(fā)送測試數(shù)據(jù)到kafka,代碼如下:
這里通過隨機數(shù)來擾亂username,便于使用戶名大小不一,讓結(jié)果更加明顯。KafkaUtil是自己寫的一個kafka工具類,代碼很簡單,主要是平時做測試方便。
4. 主要程序
創(chuàng)建一個main程序,開始編寫代碼。
創(chuàng)建flink環(huán)境,關(guān)聯(lián)kafka數(shù)據(jù)源。
EventTime 與 Watermark
設(shè)置屬性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
,表示按照數(shù)據(jù)時間字段來處理,默認是TimeCharacteristic.ProcessingTime
這個屬性必須設(shè)置,否則后面,可能窗口結(jié)束無法觸發(fā),導(dǎo)致結(jié)果無法輸出。取值有三種:
- ProcessingTime:事件被處理的時間。也就是由flink集群機器的系統(tǒng)時間來決定。
- EventTime:事件發(fā)生的時間。一般就是數(shù)據(jù)本身攜帶的時間。
- IngestionTime:攝入時間,數(shù)據(jù)進入flink流的時間,跟ProcessingTime還是有區(qū)別的;
指定好使用數(shù)據(jù)的實際時間來處理,接下來需要指定flink程序如何get到數(shù)據(jù)的時間字段,這里使用調(diào)用DataStream的assignTimestampsAndWatermarks方法,抽取時間和設(shè)置watermark。
前面給出的代碼中可以看出,由于發(fā)送到kafka的時候,將User對象轉(zhuǎn)換為json字符串了,這里使用的是fastjson,接收過來可以轉(zhuǎn)化為JsonObject來處理,我這里還是將其轉(zhuǎn)化為User對象JSON.parseObject(x, User.class)
,便于處理。
這里考慮到數(shù)據(jù)可能亂序,使用了可以處理亂序的抽象類BoundedOutOfOrdernessTimestampExtractor
,并且實現(xiàn)了唯一的一個沒有實現(xiàn)的方法extractTimestamp
,亂序數(shù)據(jù),會導(dǎo)致數(shù)據(jù)延遲,在構(gòu)造方法中傳入了一個Time.milliseconds(1000)
,表明數(shù)據(jù)可以延遲一秒鐘。比如說,如果窗口長度是10s,0~10s的數(shù)據(jù)會在11s的時候計算,此時watermark是10,才會觸發(fā)計算,也就是說引入watermark處理亂序數(shù)據(jù),最多可以容忍0~t這個窗口的數(shù)據(jù),最晚在t+1時刻到來。
窗口統(tǒng)計
業(yè)務(wù)需求上,通??赡苁且粋€小時,或者過去15分鐘的數(shù)據(jù),5分鐘更新一次排名,這里為了演示效果,窗口長度取10s,每次滑動(slide)5s,即5秒鐘更新一次過去10s的排名數(shù)據(jù)。
我們使用.keyBy("username")
對用戶進行分組,使用.timeWindow(Time size, Time slide)
對每個用戶做滑動窗口(10s窗口,5s滑動一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf)
做增量的聚合操作,它能使用AggregateFunction
提前聚合掉數(shù)據(jù),減少 state 的存儲壓力。較之.apply(WindowFunction wf)
會將窗口中的數(shù)據(jù)都存儲下來,最后一起計算要高效地多。aggregate()
方法的第一個參數(shù)用于
這里的CountAgg
實現(xiàn)了AggregateFunction
接口,功能是統(tǒng)計窗口中的條數(shù),即遇到一條數(shù)據(jù)就加一。
.aggregate(AggregateFunction af, WindowFunction wf)
的第二個參數(shù)WindowFunction
將每個 key每個窗口聚合后的結(jié)果帶上其他信息進行輸出。我們這里實現(xiàn)的WindowResultFunction
將用戶名,窗口,訪問量封裝成了UserViewCount
進行輸出。
TopN計算最活躍用戶
為了統(tǒng)計每個窗口下活躍的用戶,我們需要再次按窗口進行分組,這里根據(jù)UserViewCount
中的windowEnd
進行keyBy()
操作。然后使用 ProcessFunction
實現(xiàn)一個自定義的 TopN 函數(shù) TopNHotItems
來計算點擊量排名前3名的用戶,并將排名結(jié)果格式化成字符串,便于后續(xù)輸出。
ProcessFunction
是 Flink 提供的一個 low-level API,用于實現(xiàn)更高級的功能。它主要提供了定時器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時收齊了某個 window 下所有用戶的訪問數(shù)據(jù)。由于 Watermark 的進度是全局的,在 processElement
方法中,每當收到一條數(shù)據(jù)(ItemViewCount
),我們就注冊一個 windowEnd+1
的定時器(Flink 框架會自動忽略同一時間的重復(fù)注冊)。windowEnd+1
的定時器被觸發(fā)時,意味著收到了windowEnd+1
的 Watermark,即收齊了該windowEnd
下的所有用戶窗口統(tǒng)計值。我們在 onTimer()
中處理將收集的所有商品及點擊量進行排序,選出 TopN,并將排名信息格式化成字符串后進行輸出。
這里我們還使用了 ListState<ItemViewCount>
來存儲收到的每條 UserViewCount
消息,保證在發(fā)生故障時,狀態(tài)數(shù)據(jù)的不丟失和一致性。ListState
是 Flink 提供的類似 Java List
接口的 State API,它集成了框架的 checkpoint 機制,自動做到了 exactly-once 的語義保證。
結(jié)果輸出
可以看到,每隔5秒鐘更新輸出一次數(shù)據(jù)。
作者:柯廣的網(wǎng)絡(luò)日志
微信公眾號:Java大數(shù)據(jù)與數(shù)據(jù)倉庫