Spark 在大數(shù)據(jù)開發(fā)中的最佳實踐
1前 言
eBay 智能營銷部門致力于打造數(shù)據(jù)驅(qū)動的業(yè)務智能中臺,以支持業(yè)務部門快速開展營銷活動。目前在我們正在構(gòu)建一個基于eBay站外營銷的業(yè)務全渠道漏斗分析指標,涉及近十個營銷渠道、數(shù)十張數(shù)據(jù)源表,每天處理的數(shù)據(jù)達到上百TB。由于業(yè)務復雜、數(shù)據(jù)源異構(gòu)、指標計算邏輯頻繁變更、數(shù)據(jù)體量巨大,如何快速完成數(shù)據(jù)處理開發(fā)任務是一個巨大的挑戰(zhàn)。在長時間的生產(chǎn)實踐中,我們總結(jié)了一套基于Scala開發(fā)Spark任務的可行規(guī)范,來幫助我們寫出高可讀性、高可維護性和高質(zhì)量的代碼,提升整體開發(fā)效率。
2基本開發(fā)規(guī)范
一、字段名規(guī)范?
字段名應足夠簡潔清晰,使閱讀者能快速理解字段內(nèi)容。
相似字段名應加上數(shù)據(jù)源、級別名、細分屬性加以區(qū)分,例如我們有 Google 的 click數(shù)據(jù)和內(nèi)部的click數(shù)據(jù),那么就應該使用 PARTNER_CLICK, INTERNAL_CLICK 來命名不同的點擊數(shù)據(jù)。
二、業(yè)務字典
對于公司已有統(tǒng)一命名的專業(yè)術(shù)語,應采用此命名方式,例如 GMB。
對于公司級別命名并未統(tǒng)一的專業(yè)術(shù)語,在 domain 或 team 內(nèi)部應有統(tǒng)一的命名規(guī)范。比如你的ETL任務中用到了多個系統(tǒng)的數(shù)據(jù),對于用戶ID,系統(tǒng)A里面叫user_id,系統(tǒng)B里面叫u_id,系統(tǒng)C里面叫mapped_id,當經(jīng)過我們數(shù)據(jù)清洗流程后我們應該將這些字段統(tǒng)一成同個概念,比如USER_ID。
數(shù)據(jù) schema 中字段名應用下劃線分割,而代碼變量仍舊應該采用駝峰命名法,字段與變量應該有默認對應關(guān)系。
建議維護一個業(yè)務名詞庫用來統(tǒng)一定義專業(yè)概念和術(shù)語,注明是公司級別的術(shù)語或是 domain/team 級別的術(shù)語,級別名稱應在字段名上體現(xiàn)。
三、冪等性
一個spark任務應該是冪等的,這個任務在有同樣的輸入時被執(zhí)行多次輸出是恒定的,不應該產(chǎn)生副作用。
四、數(shù)值類型
在分析計算需求的時候,需要先對數(shù)值類型進行分類,不同的數(shù)值類型的計算方式也會不同。
原始數(shù)值指標:由事件帶出的數(shù)值指標,在定比數(shù)據(jù)級別(ratio level),可以直接進行算數(shù)運算
?? 示例:Clicks,GMB,Spend,Watch Count等
?? 對于一個廣告系列中,我們可以直接將廣告系列中的產(chǎn)品的GMB直接相加得到總GMB
衍生數(shù)值指標:由原始數(shù)值指標進行衍生計算而獲得的指標,適用于固定場景。根據(jù)衍生計算邏輯,不一定能直接進行算數(shù)運算。因而,在計算涉及衍生數(shù)值指標時,需考慮該邏輯的影響。
?? 示例:CPC(每次點擊成本=廣告費用/點擊量),ROAS(支出回報率=廣告收入/廣告費用)
?? 對于一個廣告系列,我們不能直接將廣告系列中的CPC相加得到總CPC
定類數(shù)據(jù) (Nominal level)
定類數(shù)據(jù)不作為數(shù)值指標,不可進行算數(shù)計算。
3基本編碼規(guī)范
一、建議將建表DDL和寫數(shù)據(jù)分離,并且不要在編碼中使用drop+create來覆寫表數(shù)據(jù)
當使用drop table再重建table的方式刷新表數(shù)據(jù)時,會有一定的風險。因為 drop table 和 create table 是非原子性操作,如果drop table完成后,重建的sql因為某些不可抗原因失敗了,會直接導致數(shù)據(jù)丟失,而這個表也變成不可用狀態(tài)。
如下sql,如果create table失敗,table將處于不可用狀態(tài):
更佳的方式應該如下:
當數(shù)據(jù)重新生成完以后只需要使用原子操作更新hive的location即可,這樣就可以保證每次寫入數(shù)據(jù)時不影響表的使用。
二、特殊的邏輯應該要有注釋,比如 ,應該說明這個字段和對應的值的作用,或者定義一個常量來語義化這個魔法值,比如:
三、在hive中沒有布爾值,禁止使用true/false,它在hive中會變成字符串‘true’/’false’,所以請使用數(shù)值類型代替布爾類型。
四、若使用Spark SQL編寫代碼,那么最好不要將核心的SQL邏輯拆分成片段,這樣會使可讀性下降。對于有多段邏輯的Job,需要讓代碼更多的暴露出表操作的核心邏輯。
以下是一個反例的偽代碼,過度的函數(shù)分裝會使代碼可讀性下降,從而無法快速直觀的了解表操作的邏輯,那么就需要添加相關(guān)的注釋方便閱讀:
稍微修改一下,以下是偽代碼, 我們可以將表操作的邏輯暴露出來,而把非核心邏輯封裝起來,這樣我們可以輕松理解這段代碼到底是在做什么:
4Spark開發(fā)最佳實踐
一、使用Spark cache時,需要考慮它能否帶來計算時間上的提升。Spark cache是使用給定的存儲級別來緩存表的內(nèi)容或查詢的輸出內(nèi)容,常用于未來查詢中復用原始文件的場景。
Cache的存儲級別分為以下幾種:
NONE:不進行緩存
DISK_ONLY:只在磁盤中緩存
DISKONLY_2:只在磁盤中緩存并進行2次備份
MEMORY_ONLY:只在內(nèi)存中緩存
MEMORY_ONLY_2:只在內(nèi)存中緩存并進行2次備份
MEMORY_ONLY_SER:只在內(nèi)存中緩存并進行序列化
MEMORY_ONLY_SER_2:只在內(nèi)存中緩存并進行序列化和2次備份
MEMORY_AND_DISK:在內(nèi)存中緩存,如果內(nèi)存不足將寫入磁盤 (默認緩存級別)
MEMORY_AND_DISK_2 :在內(nèi)存中緩存并進行2次備份,如果內(nèi)存不足將寫入磁盤
MEMORY_AND_DISK_SER:在內(nèi)存中緩存并序列化,如果內(nèi)存不足將寫入磁盤
MEMORY_AND_DISK_SER_2 :在內(nèi)存中緩存并序列化和2次備份,如果內(nèi)存不足將寫入磁盤
OFF_HEAP:使用堆外內(nèi)緩存
如果所需要 cache的數(shù)據(jù)集過大,使用 MEMORY_ONLY 容易導致OOM;而使用默認的MEMORY_AND_DISK,當內(nèi)存不適合寫入時則會寫入磁盤,這時I/O會占用大量時間,并且由于內(nèi)存減少導致頻繁GC,反而使效率下降。在使用 cache 的時候需要平衡好數(shù)據(jù) I/O 的開銷和計算資源的使用。如果一個數(shù)據(jù)集cache消耗的I/O時間不是明顯小于直接重計算消耗的時間,不建議使用cache。
以下是一個例子,可以看到這里 被使用了兩次,那么對于這種場景我們需要權(quán)衡通過join計算和cache所消耗的I/O的代價。 是由一張小表 join大表生成的,如果在join完后我們添加了cache,數(shù)據(jù)量仍舊非常大,cache數(shù)據(jù)時會產(chǎn)生額外的磁盤寫入開銷;而考慮到這個 join 操作本身所需要的計算時間并不多,如果從時間性能的角度考慮,這個case我們就不應該使用cache。
二、DataFrame的 API 和Spark SQL中的 union 行為是不一致的,DataFrame中union默認不會進行去重,Spark SQL union 默認會進行去重。
三、兩個DataFrame來源于同一個數(shù)據(jù)源,如果直接將它們join則會報以下錯:
Detected implicit cartesian product for LEFT(INNER/RIGHT) OUTER join between logical plans
由于來自同一個數(shù)據(jù)源的DataFrame join很容易產(chǎn)生笛卡爾積,所以Spark默認禁止這種行為。但是在一些業(yè)務場景中的確有這種join的情況,解決方案有兩種:
在join前將數(shù)據(jù)存儲到臨時目錄(一般是HDFS),再重新加載進來,用來截斷血緣。
添加spark配置:spark.sql.crossJoin.enabled=true 但是不建議這么做,這樣會導致其他可能有隱患的join也被忽略了
四、寫入分區(qū)表時,Spark會默認覆蓋所有分區(qū),如果只是想覆蓋當前DataFrame中有數(shù)據(jù)的分區(qū),需要配置如下參數(shù)開啟動態(tài)分區(qū),動態(tài)分區(qū)會在有數(shù)據(jù)需要寫入分區(qū)時才會將當前分區(qū)清空。需要注意的是開啟動態(tài)分區(qū)會導致寫入效率下降:
五、DataFrame中使用udf時,需要注意udf的參數(shù)如果是基礎類型則必須不為空,否則不會被執(zhí)行。
示例:如下代碼,一個簡單根據(jù)int值生成對應的flag,但是如果norb是null的話,那么這段udf不會被執(zhí)行,對應的norbFlag為null。對于這種由于null值導致的邏輯不一樣問題,可以借助DataFrameNaFunctions 來協(xié)助處理null值情況。
六、Spark原生不支持數(shù)據(jù)更改,所以對一些非分區(qū)表更新數(shù)據(jù)是有挑戰(zhàn)的。這里我們可以借鑒一個類似delta lake的upsert方案「1」:取出歷史數(shù)據(jù),按照唯一鍵將需要upsert的數(shù)據(jù)挖去,再和待添加的數(shù)據(jù)做union,可以實現(xiàn)更新有唯一鍵的表的功能。以下是示例代碼:
5后 記
使用 Spark 開發(fā)大數(shù)據(jù) ETL 已經(jīng)成為業(yè)界的主流方案。此篇文章總結(jié)了我們在使用 Spark 過程中所遇到的挑戰(zhàn)和技術(shù)案例,希望能夠拋磚引玉,引出更多更好的實踐方案。最后,也要感謝楊青波對此文章的貢獻,以及劉煉和劉軼的審稿。
參考
「1」https://github.com/delta-io/delta/blob/73ca6fcea0a25f302ee655f9849f86832bbe5f23/examples/scala/src/main/scala/example/QuickstartSQL.scala
作者:楊青波
歡迎關(guān)注:大數(shù)據(jù)夢想家