2016-08-24 92 views
1

我具有與每個40GB存儲器的四個節點Hadoop集羣(MAPR)。我需要在大數據集的一個字段上「應用」一個函數(500萬行)。我的代碼的流程是,我讀了蜂巢表中的數據作爲一個火花數據幀和應用上的一列所需的功能如下:地圖變換性能火花數據幀VS RDD

schema = StructType([StructField("field1", IntegerType(), False), StructField("field2", StringType(), False),StructField("field3", FloatType(), False)]) 
udfCos = udf(lambda row: function_call(row), schema) 
result = SparkDataFrame.withColumn("temp", udfCos(stringArgument)) 

類似的RDD版本可能像下面這樣:

result = sparkRDD.map(lambda row: function_call(row)) 

我想提高這段代碼的性能,我確保代碼以最大並行度和降低的吞吐量運行 - 我需要幫助使用SparkConf中的'重新分配''並行性值之類的火花概念'或其他方法,在我的問題的背景下。任何幫助表示讚賞。

我的火花啓動參數:

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 10 --driver-cores 10 --driver-memory 30g --executor-memory 7g --executor-cores 5 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="150" 
+0

對於初學者不要使用Python的UDF。 – zero323

+0

該建議背後的任何具體推理?我沒有這導致對RDD地圖和UDF(與默認值) – Mike

+2

一般這種往返JVM同一運行時持續時間的樣品測試 - >的Python - > JVM是昂貴的和相對慢的,並有一些其他難看特性(特別是在火花<2 ),所以如果你可以的話,你應該更喜歡原生的(JVM)函數組合在UDF上。 – zero323

回答

0

對於調整你的應用程序,你需要知道一些事情

1)你需要監視你的應用集羣是利用不足或沒有太多的資源如何通過已創建

監控可以使用各種工具如做你的應用程序中使用。 Ganglia從Ganglia你可以找到CPU,內存和網絡使用情況。根據觀察有關CPU和內存使用

2)你可以得到一個更好的主意,需要什麼樣的調整,爲您的應用程序,你

形式星火點

在火花defaults.conf

您可以指定需要什麼樣的序列化您的應用程序需要多少驅動程序內存和執行程序內存,即使您可以更改垃圾回收算法。

下面是幾個例子,你可以調整根據您的要求

spark.serializer     org.apache.spark.serializer.KryoSerializer 
spark.executor.extraJavaOptions -XX:MaxPermSize=2G -XX:+UseG1GC 
spark.driver.extraJavaOptions -XX:MaxPermSize=6G -XX:+UseG1GC 

瞭解更多詳情這個參數是指http://spark.apache.org/docs/latest/tuning.html