2016-01-13 87 views
2

我在同一臺機器上有一個Spark集羣和一個Hdfs。 我已經在每臺機器的本地文件系統和hdfs分佈式文件系統上覆制了一個單獨的文本文件,大約爲3Gbytes。Spark本地vs hdfs permormance

我有一個簡單的字數pyspark程序。

如果我提交從本地文件系統讀取文件的程序,它會持續約33秒。 如果我提交從hdfs讀取文件的程序,它持續約46秒。

爲什麼?我期待完全相反的結果。

增加sgvd的請求後:

16奴隸1個主

星火獨立的,沒有特別的設置(複製因子3)

版本1.5.2

import sys 
sys.path.insert(0, '/usr/local/spark/python/') 
sys.path.insert(0, '/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip') 
import os 
os.environ['SPARK_HOME']='/usr/local/spark' 
os.environ['JAVA_HOME']='/usr/local/java' 
from pyspark import SparkContext 
#conf = pyspark.SparkConf().set<conf settings> 


if sys.argv[1] == 'local': 
    print 'Esecuzine in modalita local file' 
    sc = SparkContext('spark://192.168.2.11:7077','Test Local file') 
    rdd = sc.textFile('/root/test2') 
else: 
    print 'Esecuzine in modalita hdfs' 
    sc = SparkContext('spark://192.168.2.11:7077','Test HDFS file') 
    rdd = sc.textFile('hdfs://192.168.2.11:9000/data/test2') 


rdd1 = rdd.flatMap(lambda x: x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y) 
topFive = rdd1.takeOrdered(5,key=lambda x: -x[1]) 
print topFive 
+0

它可以依靠很多東西。你的集羣有多大?你使用什麼集羣管理器?任何自定義設置?什麼Spark版本?你能顯示你的代碼嗎? – sgvd

+0

我在問題的空間回答。 – arj

回答

0

這是因爲如何數據是分佈式的,單個文檔不是一個好的選擇,有幾個更好的選擇,如parquet,如果你這樣做所以您會注意到性能會顯着提高,這是因爲文件分區的方式允許您的羣集將並行讀取這些部分,從而提高性能。

1

它有點直觀,但由於複製因子爲3,並且您有16個節點,因此每個節點平均有20%的數據存儲在本地HDFS中。然後,大約6個工作節點應該足以平均讀取整個文件,而無需任何網絡傳輸。

如果您記錄運行時間與工作人員節點的數量,您應該注意,在大約6之後,從本地FS和HDFS讀取數據不會有任何區別。

上述計算可以使用變量來完成,例如, x=number of worker nodes,y= replication factor,但您可以很容易地看到,由於從本地FS讀取強制該文件位於所有節點上,因此最終使用x=y,並且在使用floor(x/y)節點後沒有差異。這正是你所觀察到的,起初它似乎不符合直覺。你會在生產中使用複製因子100%嗎?

+0

更改人工因素但不包括工人人數不會改變時間。 隨着6工人repfactor 3和6 datanode時間增加到1分30秒。 – arj

+0

你是如何配置的?你重啓了你的集羣嗎?你在描述中說你有16個奴隸。 –

+0

代表事實更改嘗試:我已將文件的代表事實從2更改爲16.程序提交給16個從屬。 節點數嘗試:我已經重新配置整個羣集(火花和hadoop)只有6個節點。 – arj

1

Executor,Driver和RDD特有的參數(關於Spilling和存儲級別)是什麼?

從星火documentation

性能的影響

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O.要爲洗牌組織數據,星火產生的任務集 - 地圖的任務來組織數據,以及一組reduce任務彙總吧。這個術語來自MapReduce,並不直接與Spark的地圖和減少操作相關。

某些洗牌操作會消耗大量的堆內存,因爲它們使用內存中的數據結構在傳輸它們之前或之後組織記錄。 Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection

我對memory/CPU core限制爲Spark Job Vs memory/CPU core限制Map & Reduce任務感興趣。

關鍵參數指標從Hadoop的:

yarn.nodemanager.resource.cpu-vcores 
mapreduce.map.cpu.vcores 
mapreduce.reduce.cpu.vcores 
mapreduce.map.memory.mb 
mapreduce.reduce.memory.mb 
mapreduce.reduce.shuffle.memory.limit.percent 

關鍵參數對Hadoop的基準SPARK params用於在對等。

spark.driver.memory 
spark.driver.cores 
spark.executor.memory 
spark.executor.cores 
spark.memory.fraction 

這些只是一些關鍵參數。看看SPARKMap ReduceMap Reduce的詳細集

沒有正確的參數設置,我們無法比較跨兩種不同技術的作業性能。

相關問題