Spark:比Hadoop更強大的分布式數(shù)據(jù)計算項目
Spark是一個由加州大學伯克利分校(UC Berkeley AMP)開發(fā)的一個分布式數(shù)據(jù)快速分析項目。它的核心技術是彈性分布式數(shù)據(jù)集(Resilient distributed datasets),提供了比Hadoop更加豐富的MapReduce模型,可以快速在內(nèi)存中對數(shù)據(jù)集進行多次迭代,來支持復雜的數(shù)據(jù)挖掘算法和圖計算算法。
Spark使用Scala開發(fā),使用Mesos作為底層的調度框架,可以和hadoop和Ec2緊密集成,直接讀取hdfs或S3的文件進行計算并把結果寫回hdfs或S3,是Hadoop和Amazon云計算生態(tài)圈的一部分。Spark是一個小巧玲瓏的項目,項目的core部分的代碼只有63個Scala文件,充分體現(xiàn)了精簡之美。
Spark之依賴
Map Reduce模型:作為一個分布式計算框架,Spark采用了MapReduce模型。在它身上,Google的Map Reduce和Hadoop的痕跡很重,很明顯,它并非一個大的創(chuàng)新,而是微創(chuàng)新。在基礎理念不變的前提下,它借鑒,模仿并依賴了先輩,加入了一點改進,極大的提升了MapReduce的效率。
函數(shù)式編程:Spark由Scala寫就,而支持的語言亦是Scala。其原因之一就是Scala支持函數(shù)式編程。這一來造就了Spark的代碼簡潔,二來使得基于Spark開發(fā)的程序,也特別的簡潔。一次完整的MapReduce,Hadoop中需要創(chuàng)建一個Mapper類和Reduce類,而Spark只需要創(chuàng)建相應的一個map函數(shù)和reduce函數(shù)即可,代碼量大大降低。
Mesos:Spark將分布式運行的需要考慮的事情,都交給了Mesos,自己不Care,這也是它代碼能夠精簡的原因之一。
HDFS和S3:Spark支持2種分布式存儲系統(tǒng):HDFS和S3。應該算是目前最主流的兩種了。對文件系統(tǒng)的讀取和寫入功能是Spark自己提供的,借助Mesos分布式實現(xiàn)。
Spark與Hadoop的對比
Spark的中間數(shù)據(jù)放到內(nèi)存中,對于迭代運算效率更高。Spark更適合于迭代運算比較多的ML和DM運算。因為在Spark里面,有RDD的抽象概念。
Spark比Hadoop更通用。
Spark提供的數(shù)據(jù)集操作類型有很多種,不像Hadoop只提供了Map和Reduce兩種操作。比如map,filter,flatMap,sample,groupByKey,reduceByKey,union,join,cogroup,mapValues,sort,partionBy等多種操作類型,Spark把這些操作稱為Transformations。同時還提供Count,collect,reduce,lookup,save等多種actions操作。
這些多種多樣的數(shù)據(jù)集操作類型,給給開發(fā)上層應用的用戶提供了方便。各個處理節(jié)點之間的通信模型不再像Hadoop那樣就是唯一的Data Shuffle一種模式。用戶可以命名,物化,控制中間結果的存儲、分區(qū)等??梢哉f編程模型比Hadoop更靈活。
不過由于RDD的特性,Spark不適用那種異步細粒度更新狀態(tài)的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
容錯性。在分布式數(shù)據(jù)集計算時通過checkpoint來實現(xiàn)容錯,而checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現(xiàn)容錯。
可用性。Spark通過提供豐富的Scala, Java,Python API及交互式Shell來提高可用性。
Spark與Hadoop的結合
Spark可以直接對HDFS進行數(shù)據(jù)的讀寫,同樣支持Spark on YARN。Spark可以與MapReduce運行于同集群中,共享存儲資源與計算,數(shù)據(jù)倉庫Shark實現(xiàn)上借用Hive,幾乎與Hive完全兼容。
Spark的核心概念
Resilient Distributed Dataset (RDD)彈性分布數(shù)據(jù)集
RDD是Spark的最基本抽象,是對分布式內(nèi)存的抽象使用,實現(xiàn)了以操作本地集合的方式來操作分布式數(shù)據(jù)集的抽象實現(xiàn)。RDD是Spark最核心的東西,它表示已被分區(qū),不可變的并能夠被并行操作的數(shù)據(jù)集合,不同的數(shù)據(jù)集格式對應不同的RDD實現(xiàn)。RDD必須是可序列化的。RDD可以cache到內(nèi)存中,每次對RDD數(shù)據(jù)集的操作之后的結果,都可以存放到內(nèi)存中,下一個操作可以直接從內(nèi)存中輸入,省去了MapReduce大量的磁盤IO操作。這對于迭代運算比較常見的機器學習算法, 交互式數(shù)據(jù)挖掘來說,效率提升比較大。
RDD的特點:
它是在集群節(jié)點上的不可變的、已分區(qū)的集合對象。
通過并行轉換的方式來創(chuàng)建如(map, filter, join, etc)。
失敗自動重建。
可以控制存儲級別(內(nèi)存、磁盤等)來進行重用。
必須是可序列化的。
是靜態(tài)類型的。
RDD的好處:
RDD只能從持久存儲或通過Transformations操作產(chǎn)生,相比于分布式共享內(nèi)存(DSM)可以更高效實現(xiàn)容錯,對于丟失部分數(shù)據(jù)分區(qū)只需根據(jù)它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
RDD的不變性,可以實現(xiàn)類Hadoop MapReduce的推測式執(zhí)行。
RDD的數(shù)據(jù)分區(qū)特性,可以通過數(shù)據(jù)的本地性來提高性能,這與Hadoop MapReduce是一樣的。
RDD都是可序列化的,在內(nèi)存不足時可自動降級為磁盤存儲,把RDD存儲于磁盤上,這時性能會有大的下降但不會差于現(xiàn)在的MapReduce。
RDD的存儲與分區(qū):
用戶可以選擇不同的存儲級別存儲RDD以便重用。
當前RDD默認是存儲于內(nèi)存,但當內(nèi)存不足時,RDD會spill到disk。
RDD在需要進行分區(qū)把數(shù)據(jù)分布于集群中時會根據(jù)每條記錄Key進行分區(qū)(如Hash 分區(qū)),以此保證兩個數(shù)據(jù)集在Join時能高效。
RDD的內(nèi)部表示:
分區(qū)列表(數(shù)據(jù)塊列表)
計算每個分片的函數(shù)(根據(jù)父RDD計算出此RDD)
對父RDD的依賴列表
對key-value RDD的Partitioner【可選】
每個數(shù)據(jù)分片的預定義地址列表(如HDFS上的數(shù)據(jù)塊的地址)【可選】
RDD的存儲級別:RDD根據(jù)useDisk、useMemory、deserialized、replication四個參數(shù)的組合提供了11種存儲級別。RDD定義了各種操作,不同類型的數(shù)據(jù)由不同的RDD類抽象表示,不同的操作也由RDD進行抽實現(xiàn)。
RDD有兩種創(chuàng)建方式:
從Hadoop文件系統(tǒng)(或與Hadoop兼容的其它存儲系統(tǒng))輸入(例如HDFS)創(chuàng)建。
從父RDD轉換得到新RDD。
Spark On Mesos
Spark支持Local調用和Mesos集群兩種模式,在Spark上開發(fā)算法程序,可以在本地模式調試成功后,直接改用Mesos集群運行,除了文件的保存位置需要考慮以外,算法理論上不需要做任何修改。Spark的本地模式支持多線程,有一定的單機并發(fā)處理能力。但是不算很強勁。本地模式可以保存結果在本地或者分布式文件系統(tǒng),而Mesos模式一定需要保存在分布式或者共享文件系統(tǒng)。
為了在Mesos框架上運行,安裝Mesos的規(guī)范和設計,Spark實現(xiàn)兩個類,一個是SparkScheduler,在Spark中類名是MesosScheduler;一個是SparkExecutor,在Spark中類名是Executor。有了這兩個類,Spark就可以通過Mesos進行分布式的計算。Spark會將RDD和MapReduce函數(shù),進行一次轉換,變成標準的Job和一系列的Task。提交給SparkScheduler,SparkScheduler會把Task提交給Mesos Master,由Master分配給不同的Slave,最終由Slave中的Spark Executor,將分配到的Task一一執(zhí)行,并且返回,組成新的RDD,或者直接寫入到分布式文件系統(tǒng)。

Transformations & Actions
對于RDD可以有兩種計算方式:轉換(返回值還是一個RDD)與操作(返回值不是一個RDD)。
轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執(zhí)行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,并不會去執(zhí)行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
操作(Actions) (如:count, collect, save等),Actions操作會返回結果或把RDD數(shù)據(jù)寫到存儲系統(tǒng)中。Actions是觸發(fā)Spark啟動計算的動因。
它們本質區(qū)別是:Transformation返回值還是一個RDD。它使用了鏈式調用的設計模式,對一個RDD進行計算后,變換成另外一個RDD,然后這個RDD又可以進行另外一次轉換。這個過程是分布式的。Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統(tǒng)中。關于這兩個動作,在Spark開發(fā)指南中會有就進一步的詳細介紹,它們是基于Spark開發(fā)的核心。這里將Spark的官方ppt中的一張圖略作改造,闡明一下兩種動作的區(qū)別。

Lineage(血統(tǒng))
利用內(nèi)存加快數(shù)據(jù)加載,在眾多的其它的In-Memory類數(shù)據(jù)庫或Cache類系統(tǒng)中也有實現(xiàn),Spark的主要區(qū)別在于它處理分布式運算環(huán)境下的數(shù)據(jù)容錯性(節(jié)點實效/數(shù)據(jù)丟失)問題時采用的方案。為了保證RDD中數(shù)據(jù)的魯棒性,RDD數(shù)據(jù)集通過所謂的血統(tǒng)關系(Lineage)記住了它是如何從其它RDD中演變過來的。相比其它系統(tǒng)的細顆粒度的內(nèi)存數(shù)據(jù)更新級別的備份或者LOG機制,RDD的Lineage記錄的是粗顆粒度的特定數(shù)據(jù)轉換(Transformation)操作(filter, map, join etc.)行為。當這個RDD的部分分區(qū)數(shù)據(jù)丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數(shù)據(jù)分區(qū)。這種粗顆粒的數(shù)據(jù)模型,限制了Spark的運用場合,但同時相比細顆粒度的數(shù)據(jù)模型,也帶來了性能的提升。
RDD在Lineage依賴方面分為兩種Narrow Dependencies與Wide Dependencies用來解決數(shù)據(jù)容錯的高效性。
Narrow Dependencies是指父RDD的每一個分區(qū)最多被一個子RDD的分區(qū)所用,表現(xiàn)為一個父RDD的分區(qū)對應于一個子RDD的分區(qū)或多個父RDD的分區(qū)對應于一個子RDD的分區(qū),也就是說一個父RDD的一個分區(qū)不可能對應一個子RDD的多個分區(qū)。
Wide Dependencies是指子RDD的分區(qū)依賴于父RDD的多個分區(qū)或所有分區(qū),也就是說存在一個父RDD的一個分區(qū)對應一個子RDD的多個分區(qū)。對與Wide Dependencies,這種計算的輸入和輸出在不同的節(jié)點上,lineage方法對與輸入節(jié)點完好,而輸出節(jié)點宕機時,通過重新計算,這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統(tǒng)的意思),Narrow Dependencies對于數(shù)據(jù)的重算開銷要遠小于Wide Dependencies的數(shù)據(jù)重算開銷。
在RDD計算,通過checkpint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現(xiàn)容錯,默認是logging the updates方式,通過記錄跟蹤所有生成RDD的轉換(transformations)也就是記錄每個RDD的lineage(血統(tǒng))來重新計算生成丟失的分區(qū)數(shù)據(jù)。
Spark的Shuffle過程介紹
Shuffle Writer
Spark豐富了任務類型,有些任務之間數(shù)據(jù)流轉不需要通過Shuffle,但是有些任務之間還是需要通過Shuffle來傳遞數(shù)據(jù),比如wide dependency的group by key。
Spark中需要Shuffle輸出的Map任務會為每個Reduce創(chuàng)建對應的bucket,Map產(chǎn)生的結果會根據(jù)設置的partitioner得到對應的bucketId,然后填充到相應的bucket中去。每個Map的輸出結果可能包含所有的Reduce所需要的數(shù)據(jù),所以每個Map會創(chuàng)建R個bucket(R是reduce的個數(shù)),M個Map總共會創(chuàng)建M*R個bucket。
Map創(chuàng)建的bucket其實對應磁盤上的一個文件,Map的結果寫到每個bucket中其實就是寫到那個磁盤文件中,這個文件也被稱為blockFile,是Disk Block Manager管理器通過文件名的Hash值對應到本地目錄的子目錄中創(chuàng)建的。每個Map要在節(jié)點上創(chuàng)建R個磁盤文件用于結果輸出,Map的結果是直接輸出到磁盤文件上的,100KB的內(nèi)存緩沖是用來創(chuàng)建Fast Buffered OutputStream輸出流。這種方式一個問題就是Shuffle文件過多。

針對上述Shuffle過程產(chǎn)生的文件過多問題,Spark有另外一種改進的Shuffle過程:consolidation Shuffle,以期顯著減少Shuffle文件的數(shù)量。在consolidation Shuffle中每個bucket并非對應一個文件,而是對應文件中的一個segment部分。Job的map在某個節(jié)點上第一次執(zhí)行,為每個reduce創(chuàng)建bucket對應的輸出文件,把這些文件組織成ShuffleFileGroup,當這次map執(zhí)行完之后,這個ShuffleFileGroup可以釋放為下次循環(huán)利用;當又有map在這個節(jié)點上執(zhí)行時,不需要創(chuàng)建新的bucket文件,而是在上次的ShuffleFileGroup中取得已經(jīng)創(chuàng)建的文件繼續(xù)追加寫一個segment;當前次map還沒執(zhí)行完,ShuffleFileGroup還沒有釋放,這時如果有新的map在這個節(jié)點上執(zhí)行,無法循環(huán)利用這個ShuffleFileGroup,而是只能創(chuàng)建新的bucket文件組成新的ShuffleFileGroup來寫輸出。

比如一個Job有3個Map和2個reduce:(1) 如果此時集群有3個節(jié)點有空槽,每個節(jié)點空閑了一個core,則3個Map會調度到這3個節(jié)點上執(zhí)行,每個Map都會創(chuàng)建2個Shuffle文件,總共創(chuàng)建6個Shuffle文件;(2) 如果此時集群有2個節(jié)點有空槽,每個節(jié)點空閑了一個core,則2個Map先調度到這2個節(jié)點上執(zhí)行,每個Map都會創(chuàng)建2個Shuffle文件,然后其中一個節(jié)點執(zhí)行完Map之后又調度執(zhí)行另一個Map,則這個Map不會創(chuàng)建新的Shuffle文件,而是把結果輸出追加到之前Map創(chuàng)建的Shuffle文件中;總共創(chuàng)建4個Shuffle文件;(3) 如果此時集群有2個節(jié)點有空槽,一個節(jié)點有2個空core一個節(jié)點有1個空core,則一個節(jié)點調度2個Map一個節(jié)點調度1個Map,調度2個Map的節(jié)點上,一個Map創(chuàng)建了Shuffle文件,后面的Map還是會創(chuàng)建新的Shuffle文件,因為上一個Map還正在寫,它創(chuàng)建的ShuffleFileGroup還沒有釋放;總共創(chuàng)建6個Shuffle文件。
Shuffle Fetcher
Reduce去拖Map的輸出數(shù)據(jù),Spark提供了兩套不同的拉取數(shù)據(jù)框架:通過socket連接去取數(shù)據(jù);使用netty框架去取數(shù)據(jù)。
每個節(jié)點的Executor會創(chuàng)建一個BlockManager,其中會創(chuàng)建一個BlockManagerWorker用于響應請求。當Reduce的GET_BLOCK的請求過來時,讀取本地文件將這個blockId的數(shù)據(jù)返回給Reduce。如果使用的是Netty框架,BlockManager會創(chuàng)建ShuffleSender用于發(fā)送Shuffle數(shù)據(jù)。并不是所有的數(shù)據(jù)都是通過網(wǎng)絡讀取,對于在本節(jié)點的Map數(shù)據(jù),Reduce直接去磁盤上讀取而不再通過網(wǎng)絡框架。
Reduce拖過來數(shù)據(jù)之后以什么方式存儲呢?Spark Map輸出的數(shù)據(jù)沒有經(jīng)過排序,Spark Shuffle過來的數(shù)據(jù)也不會進行排序,Spark認為Shuffle過程中的排序不是必須的,并不是所有類型的Reduce需要的數(shù)據(jù)都需要排序,強制地進行排序只會增加Shuffle的負擔。Reduce拖過來的數(shù)據(jù)會放在一個HashMap中,HashMap中存儲的也是<key, value>對,key是Map輸出的key,Map輸出對應這個key的所有value組成HashMap的value。Spark將Shuffle取過來的每一個<key, value>對插入或者更新到HashMap中,來一個處理一個。HashMap全部放在內(nèi)存中。
Shuffle取過來的數(shù)據(jù)全部存放在內(nèi)存中,對于數(shù)據(jù)量比較小或者已經(jīng)在Map端做過合并處理的Shuffle數(shù)據(jù),占用內(nèi)存空間不會太大,但是對于比如group by key這樣的操作,Reduce需要得到key對應的所有value,并將這些value組一個數(shù)組放在內(nèi)存中,這樣當數(shù)據(jù)量較大時,就需要較多內(nèi)存。
當內(nèi)存不夠時,要不就失敗,要不就用老辦法把內(nèi)存中的數(shù)據(jù)移到磁盤上放著。Spark意識到在處理數(shù)據(jù)規(guī)模遠遠大于內(nèi)存空間時所帶來的不足,引入了一個具有外部排序的方案。Shuffle過來的數(shù)據(jù)先放在內(nèi)存中,當內(nèi)存中存儲的<key, value>對超過1000并且內(nèi)存使用超過70%時,判斷節(jié)點上可用內(nèi)存如果還足夠,則把內(nèi)存緩沖區(qū)大小翻倍,如果可用內(nèi)存不再夠了,則把內(nèi)存中的<key, value>對排序然后寫到磁盤文件中。最后把內(nèi)存緩沖區(qū)中的數(shù)據(jù)排序之后和那些磁盤文件組成一個最小堆,每次從最小堆中讀取最小的數(shù)據(jù),這個和MapReduce中的merge過程類似。
MapReduce和Spark的Shuffle過程對比

Spark的資源管理與作業(yè)調度
Spark對于資源管理與作業(yè)調度可以使用本地模式,Standalone(獨立模式),Apache Mesos及Hadoop YARN來實現(xiàn)。Spark on Yarn在Spark0.6時引用,但真正可用是在現(xiàn)在的branch-0.8版本。Spark on Yarn遵循YARN的官方規(guī)范實現(xiàn),得益于Spark天生支持多種Scheduler和Executor的良好設計,對YARN的支持也就非常容易,Spark on Yarn的大致框架圖。

讓Spark運行于YARN上與Hadoop共用集群資源可以提高資源利用率。
編程接口
Spark通過與編程語言集成的方式暴露RDD的操作,類似于DryadLINQ和FlumeJava,每個數(shù)據(jù)集都表示為RDD對象,對數(shù)據(jù)集的操作就表示成對RDD對象的操作。Spark主要的編程語言是Scala,選擇Scala是因為它的簡潔性(Scala可以很方便在交互式下使用)和性能(JVM上的靜態(tài)強類型語言)。
Spark和Hadoop MapReduce類似,由Master(類似于MapReduce的Jobtracker)和Workers(Spark的Slave工作節(jié)點)組成。用戶編寫的Spark程序被稱為Driver程序,Dirver程序會連接master并定義了對各RDD的轉換與操作,而對RDD的轉換與操作通過Scala閉包(字面量函數(shù))來表示,Scala使用Java對象來表示閉包且都是可序列化的,以此把對RDD的閉包操作發(fā)送到各Workers節(jié)點。 Workers存儲著數(shù)據(jù)分塊和享有集群內(nèi)存,是運行在工作節(jié)點上的守護進程,當它收到對RDD的操作時,根據(jù)數(shù)據(jù)分片信息進行本地化數(shù)據(jù)操作,生成新的數(shù)據(jù)分片、返回結果或把RDD寫入存儲系統(tǒng)。

Scala:Spark使用Scala開發(fā),默認使用Scala作為編程語言。編寫Spark程序比編寫Hadoop MapReduce程序要簡單的多,SparK提供了Spark-Shell,可以在Spark-Shell測試程序。寫SparK程序的一般步驟就是創(chuàng)建或使用(SparkContext)實例,使用SparkContext創(chuàng)建RDD,然后就是對RDD進行操作。
Java:Spark支持Java編程,但對于使用Java就沒有了Spark-Shell這樣方便的工具,其它與Scala編程是一樣的,因為都是JVM上的語言,Scala與Java可以互操作,Java編程接口其實就是對Scala的封裝。如:
Python:現(xiàn)在Spark也提供了Python編程接口,Spark使用py4j來實現(xiàn)python與java的互操作,從而實現(xiàn)使用python編寫Spark程序。Spark也同樣提供了pyspark,一個Spark的python shell,可以以交互式的方式使用Python編寫Spark程序。
Spark生態(tài)系統(tǒng)
Shark ( Hive on Spark): Shark基本上就是在Spark的框架基礎上提供和Hive一樣的H iveQL命令接口,為了最大程度的保持和Hive的兼容性,Shark使用了Hive的API來實現(xiàn)query Parsing和 Logic Plan generation,最后的PhysicalPlan execution階段用Spark代替Hadoop MapReduce。通過配置Shark參數(shù),Shark可以自動在內(nèi)存中緩存特定的RDD,實現(xiàn)數(shù)據(jù)重用,進而加快特定數(shù)據(jù)集的檢索。同時,Shark通過UDF用戶自定義函數(shù)實現(xiàn)特定的數(shù)據(jù)分析學習算法,使得SQL數(shù)據(jù)查詢和運算分析能結合在一起,最大化RDD的重復使用。
Spark streaming: 構建在Spark上處理Stream數(shù)據(jù)的框架,基本的原理是將Stream數(shù)據(jù)分成小的時間片斷(幾秒),以類似batch批量處理的方式來處理這小部分數(shù)據(jù)。Spark Streaming構建在Spark上,一方面是因為Spark的低延遲執(zhí)行引擎(100ms+)可以用于實時計算,另一方面相比基于Record的其它處理框架(如Storm),RDD數(shù)據(jù)集更容易做高效的容錯處理。此外小批量處理的方式使得它可以同時兼容批量和實時數(shù)據(jù)處理的邏輯和算法。方便了一些需要歷史數(shù)據(jù)和實時數(shù)據(jù)聯(lián)合分析的特定應用場合。
Bagel: Pregel on Spark,可以用Spark進行圖計算,這是個非常有用的小項目。Bagel自帶了一個例子,實現(xiàn)了Google的PageRank算法。
Spark的適用場景
Spark是基于內(nèi)存的迭代計算框架,適用于需要多次操作特定數(shù)據(jù)集的應用場合。需要反復操作的次數(shù)越多,所需讀取的數(shù)據(jù)量越大,受益越大,數(shù)據(jù)量小但是計算密集度較大的場合,受益就相對較小
由于RDD的特性,Spark不適用那種異步細粒度更新狀態(tài)的應用,例如web服務的存儲或者是增量的web爬蟲和索引。就是對于那種增量修改的應用模型不適合。
總的來說Spark的適用面比較廣泛且比較通用。
在業(yè)界的使用
Spark項目在2009年啟動,2010年開源, 現(xiàn)在使用的有:Berkeley, Princeton, Klout, Foursquare, Conviva, Quantifind, Yahoo! Research & others, 淘寶等,豆瓣也在使用Spark的python克隆版Dpark。
騰訊?廣點通是最早使用Spark的應用之一。騰訊大數(shù)據(jù)精準推薦借助Spark快速迭代的優(yōu)勢,圍繞“數(shù)據(jù)+算法+系統(tǒng)”這套技術方案,實現(xiàn)了在“數(shù)據(jù)實時采集、算法實時訓練、系統(tǒng)實時預測”的全流程實時并行高維算法,最終成功應用于廣點通pCTR投放系統(tǒng)上,支持每天上百億的請求量。
基于日志數(shù)據(jù)的快速查詢系統(tǒng)業(yè)務構建于Spark之上的Shark,利用其快速查詢以及內(nèi)存表等優(yōu)勢,承擔了日志數(shù)據(jù)的即席查詢工作。在性能方面,普遍比Hive高2-10倍,如果使用內(nèi)存表的功能,性能將會比Hive快百倍。Yahoo?Yahoo將Spark用在Audience Expansion中的應用。Audience Expansion是廣告中尋找目標用戶的一種方法:首先廣告者提供一些觀看了廣告并且購買產(chǎn)品的樣本客戶,據(jù)此進行學習,尋找更多可能轉化的用戶,對他們定向廣告。Yahoo采用的算法是logisticregression。同時由于有些SQL負載需要更高的服務質量,又加入了專門跑Shark的大內(nèi)存集群,用于取代商業(yè)BI/OLAP工具,承擔報表/儀表盤和交互式/即席查詢,同時與桌面BI工具對接。目前在Yahoo部署的Spark集群有112臺節(jié)點,9.2TB內(nèi)存。
淘寶?阿里搜索和廣告業(yè)務,最初使用Mahout或者自己寫的MR來解決復雜的機器學習,導致效率低而且代碼不易維護。淘寶技術團隊使用了Spark來解決多次迭代的機器學習算法、高計算復雜度的算法等。將Spark運用于淘寶的推薦相關算法上,同時還利用Graphx解決了許多生產(chǎn)問題,包括以下計算場景:基于度分布的中樞節(jié)點發(fā)現(xiàn)、基于最大連通圖的社區(qū)發(fā)現(xiàn)、基于三角形計數(shù)的關系衡量、基于隨機游走的用戶屬性傳播等。
優(yōu)酷土豆?優(yōu)酷土豆在使用Hadoop集群的突出問題主要包括:第一是商業(yè)智能BI方面,分析師提交任務之后需要等待很久才得到結果;第二就是大數(shù)據(jù)量計算,比如進行一些模擬廣告投放之時,計算量非常大的同時對效率要求也比較高,最后就是機器學習和圖計算的迭代運算也是需要耗費大量資源且速度很慢。
最終發(fā)現(xiàn)這些應用場景并不適合在MapReduce里面去處理。通過對比,發(fā)現(xiàn)Spark性能比MapReduce提升很多。首先,交互查詢響應快,性能比Hadoop提高若干倍;模擬廣告投放計算效率高、延遲?。ㄍ琱adoop比延遲至少降低一個數(shù)量級);機器學習、圖計算等迭代計算,大大減少了網(wǎng)絡傳輸、數(shù)據(jù)落地等,極大的提高的計算性能。目前Spark已經(jīng)廣泛使用在優(yōu)酷土豆的視頻推薦(圖計算)、廣告業(yè)務等。京東?應用于京東云海項目,集成MQ和Kafka, 基于Spark Streaming進行實時計算,輸出到HBase
網(wǎng)易?在網(wǎng)易大數(shù)據(jù)平臺中,數(shù)據(jù)存儲在HDFS之后,提供Hive的數(shù)據(jù)倉庫計算和查詢,要提高數(shù)據(jù)處理的性能并達到實時級別,網(wǎng)易公司采用的是Impala和Shark結合的混合實時技術。Cloudera Impala是基于Hadoop的實時檢索引擎開源項目,其效率比Hive提高3-90倍,其本質是Google Dremel的模仿,但在SQL功能上青出于藍勝于藍。Shark是基于Spark的SQL實現(xiàn),Shark可以比 Hive 快40倍(其論文所描述), 如果執(zhí)行機器學習程序,可以快 25倍,并完全和Hive兼容
百度?據(jù)說百度是國內(nèi)規(guī)模最大的Spark集群的運營者——實際生產(chǎn)環(huán)境,最大單集群規(guī)模1300臺(包含數(shù)萬核心和上百TB內(nèi)存),公司內(nèi)部同時還運行著大量的小型Spark集群。
當前百度的Spark集群由上千臺物理主機(數(shù)萬Cores,上百TBMemory)組成,日提交App在數(shù)百,已應用于鳳巢、大搜索、直達號、百度大數(shù)據(jù)等業(yè)務。之以選擇Spark,甄鵬總結了三個原因:快速高效、API 友好易用和組件豐富。同時百度開放云還提供Spark集群計算服務,BMR中的Spark同樣隨用隨起,集群空閑即銷毀,幫助用戶節(jié)省預算。此外,集群創(chuàng)建可以在3到5分鐘內(nèi)完成,包含了完整的Spark+HDFS+YARN堆棧。同時,BMR也提供Long Running模式,并有多種套餐可選。大眾點評?2013年在建立了公司主要的大數(shù)據(jù)架構后,他們上線了HBase的應用,并引入Spark/Shark以提高Ad Hoc Query的執(zhí)行時間,并調研分布式日志收集系統(tǒng),來取代手工腳本做日志導入。