中國IDC圈4月22日報道,這篇文章算是個科普貼。假如已經熟悉Spark的就略過吧。
媒介
許多初學者其實對Spark的編程模式照舊RDD這個觀念領略不到位,就會發(fā)生一些誤解。
好比,許多時候我們經常覺得一個文件是會被完整讀入到內存,然后做各類調動,這很大概是受兩個觀念的誤導:
RDD的界說,RDD是一個漫衍式的不行變數據薈萃
Spark 是一個內存處理懲罰引擎
假如你沒有主動對RDDCache/Persist,它不外是一個觀念上存在的虛擬數據集,你實際上是看不到這個RDD的數據的全集的(他不會真的都放到內存里)。
RDD的本質是什么
一個RDD 本質上是一個函數,而RDD的調動不外是函數的嵌套。RDD我認為有兩類:
輸入RDD,典范如KafkaRDD,JdbcRDD
轉換RDD,如MapPartitionsRDD
我們以下面的代碼為例做闡明:
sc.textFile(“abc.log”).map().saveAsTextFile(“”)
textFile 會構建出一個NewHadoopRDD,
map函數運行后會構建出一個MapPartitionsRDD
saveAsTextFile觸發(fā)了實際流程代碼的執(zhí)行
所以RDD不外是對一個函數的封裝,當一個函數對數據處理懲罰完成后,我們就獲得一個RDD的數據集(是一個虛擬的,后續(xù)會表明)。
NewHadoopRDD是數據來歷,每個parition認真獲取數據,得到進程是通過iterator.next 得到一條一筆記錄的。假設某個時刻拿到了一條數據A,這個A會立即被map里的函數處理懲罰獲得B(完成了轉換),然后開始寫入到HDFS上。其他數據反復如此。所以整個進程:
理論上某個MapPartitionsRDD里實際在內存里的數據便是其Partition的數目,是個很是小的數值。
NewHadoopRDD則會略多些,因為屬于數據源,讀取文件,假設讀取文件的buffer是1M,那么最多也就是partitionNum*1M 數據在內存里
saveAsTextFile也是一樣的,往HDFS寫文件,需要buffer,最大都據量為 buffer* partitionNum
所以整個進程其實是流式的進程,一條數據被各個RDD所包裹的函數處理懲罰。
適才我重復提到了嵌套函數,怎么知道它是嵌套的呢?
假如你寫了這樣一個代碼:
sc.textFile(“abc.log”).map().map()………map().saveAsTextFile(“”)
有成千上萬個map,很大概就倉庫溢出了。為啥?實際上是函數嵌套太深了。
按上面的邏輯,內存利用其實長短常小的,10G內存跑100T數據也不是難事。可是為什么Spark經常因為內存問題掛掉呢? 我們接著往下看。
Shuffle的本質是什么?
這就是為什么要分Stage了。每個Stage其實就是我上面說的那樣,一套數據被N個嵌套的函數處理懲罰(也就是你的transform行動)。碰著了Shuffle,就被切開來,所謂的Shuffle,本質上是把數據按法則姑且都落到磁盤上,相當于完成了一個saveAsTextFile的行動,不外是存當地磁盤。然后被切開的下一個Stage則以當地磁盤的這些數據作為數據源,從頭走上面描寫的流程。
我們再做一次描寫:
所謂Shuffle不外是把處理懲罰流程切分,給切分的上一段(我們稱為Stage M)加個存儲到磁盤的Action行動,把切分的下一段(Stage M+1)數據源釀成Stage M存儲的磁盤文件。每個Stage都可以走我上面的描寫,讓每條數據都可以被N個嵌套的函數處理懲罰,最后通過用戶指定的行動舉辦存儲。
為什么Shuffle 容易導致Spark掛掉
前面我們提到,Shuffle不外是偷偷的幫你加上了個雷同saveAsLocalDiskFile的行動。然而,寫磁盤是一個奮發(fā)的行動。所以我們盡大概的把數據先放到內存,再批量寫到文件里,尚有讀磁盤文件也是給費內存的行動。把數據放內存,就碰著個問題,好比10000條數據,到底會占用幾多內存?這個其實很難預估的。所以一不小心,就容易導致內存溢出了。這其實也是一個很無奈的工作。
我們做Cache/Persist意味著什么?
其實就是給某個Stage加上了一個saveAsMemoryBlockFile的行動,然后下次再要數據的時候,就不消算了。這些存在內存的數據就暗示了某個RDD處理懲罰后的功效。這個才是說為啥Spark是內存計較引擎的處所。在MR里,你是要放到HDFS里的,但Spark答允你把中間功效放內存里。
總結
我們從一個較新的角度表明白RDD 和Shuffle 都是一個什么樣的對象。
,