美團最初的數據處理以Hive SQL為主,底層計算引擎為MapReduce,部分相對復雜的業務會由工程師編寫MapReduce程序實現。隨著業務的發展,單純的Hive SQL查詢或者MapReduce程序已經越來越難以滿足數據處理和分析的需求。
一方面,MapReduce計算模型對多輪迭代的DAG作業支持不給力,每輪迭代都需要將數據落盤,極大地影響了作業執行效率,另外只提供Map和Reduce這兩種計算因子,使得用戶在實現迭代式計算(比如:機器學習算法)時成本高且效率低。
另一方面,在數據倉庫的按天生產中,由于某些原始日志是半結構化或者非結構化數據,因此,對其進行清洗和轉換操作時,需要結合SQL查詢以及復雜的過程式邏輯處理,這部分工作之前是由Hive SQL結合Python腳本來完成。這種方式存在效率問題,當數據量比較大的時候,流程的運行時間較長,這些ETL流程通常處于比較上游的位置,會直接影響到一系列下游的完成時間以及各種重要數據報表的生成。
基于以上原因,美團在2014年的時候引入了Spark。為了充分利用現有Hadoop集群的資源,我們采用了Spark on Yarn模式,所有的Spark app以及MapReduce作業會通過Yarn統一調度執行。Spark在美團數據平臺架構中的位置如圖所示:
下面將介紹Spark在美團的實踐,包括基于Spark所做的平臺化工作以及Spark在生產環境下的應用案例。其中包含Zeppelin結合的交互式開發平臺,也有使用Spark任務完成的ETL數據轉換工具,數據挖掘組基于Spark開發了特征平臺和數據挖掘平臺,另外還有基于Spark的交互式用戶行為分析系統以及在SEM投放服務中的應用,以下是詳細介紹。
Spark交互式開發平臺
在推廣如何使用Spark的過程中,我們總結了用戶開發應用的主要需求:
數據調研:在正式開發程序之前,首先需要認識待處理的業務數據,包括:數據格式,類型(若以表結構存儲則對應到字段類型)、存儲方式、有無臟數據,甚至分析根據業務邏輯實現是否可能存在數據傾斜等等。這個需求十分基礎且重要,只有對數據有充分的掌控,才能寫出高效的Spark代碼;
代碼調試:業務的編碼實現很難保證一蹴而就,可能需要不斷地調試;如果每次少量的修改,測試代碼都需要經過編譯、打包、提交線上,會對用戶的開發效率影響是非常大的;
聯合開發:對于一整個業務的實現,一般會有多方的協作,這時候需要能有一個方便的代碼和執行結果共享的途徑,用于分享各自的想法和試驗結論。
基于這些需求,我們調研了現有的開源系統,最終選擇了Apache的孵化項目Zeppelin,國內服務器租用服務器托管,將其作為基于Spark的交互式開發平臺。Zeppelin整合了Spark,Markdown,Shell,Angular等引擎,集成了數據分析和可視化等功能。
我們在原生的Zeppelin上增加了用戶登陸認證、用戶行為日志審計、權限管理以及執行Spark作業資源隔離,打造了一個美團的Spark的交互式開發平臺,不同的用戶可以在該平臺上調研數據、調試程序、共享代碼和結論。
集成在Zeppelin的Spark提供了三種解釋器:Spark、Pyspark、SQL,分別適用于編寫Scala、Python、SQL代碼。對于上述的數據調研需求,無論是程序設計之初,還是編碼實現過程中,當需要檢索數據信息時,通過Zeppelin提供的SQL接口可以很便利的獲取到分析結果;另外,Zeppelin中Scala和Python解釋器自身的交互式特性滿足了用戶對Spark和Pyspark分步調試的需求,同時由于Zeppelin可以直接連接線上集群,因此可以滿足用戶對線上數據的讀寫處理請求;最后,Zeppelin使用Web Socket通信,用戶只需要簡單地發送要分享內容所在的http鏈接,所有接受者就可以同步感知代碼修改,運行結果等,實現多個開發者協同工作。
Spark作業ETL模板 除了提供平臺化的工具以外,我們也會從其他方面來提高用戶的開發效率,比如將類似的需求進行封裝,提供一個統一的ETL模板,讓用戶可以很方便的使用Spark實現業務需求。
美團目前的數據生產主體是通過ETL將原始的日志通過清洗、轉換等步驟后加載到Hive表中。而很多線上業務需要將Hive表里面的數據以一定的規則組成鍵值對,導入到Tair中,用于上層應用快速訪問。其中大部分的需求邏輯相同,即把Hive表中幾個指定字段的值按一定的規則拼接成key值,另外幾個字段的值以json字符串的形式作為value值,最后將得到的對寫入Tair。