2016-04-27 66 views
2

我面對陌生的行爲,而使用火花與一個主站和HDFS上的三個工作節點救了我的解析XML文件,問題是星火將結果保存到HDFS

當我解析XMLFILE,並試圖在HDFS保存接檔無法保存所有解析結果。

,當我通過指定

sc = SparkContext("local", "parser") 

and the spark-submit will be ./bin/spark-submit xml_parser.py 

執行本地模式相同的代碼,這個運行提供了與完整的記錄HDFS 117MB解析的文件。

,並在火花客戶端模式下執行代碼,然後我做了以下的情況下,

sc = SparkContext("spark://master:7077", "parser") 

和火花提交的,

./bin/spark-submit --master yarn-client --deploy-mode client --driver-memory 7g --executor-memory 4g --executor-cores 2 xml_parser.py 1000 

給我的不完全記錄HDFS 19MB文件。

保存結果在這兩種情況下,我使用rdd.saveAsTextFile( 「HDFS://」)

我使用spark1.6.1-hadoop2.6 和Apache的Hadoop 2.7.2

任何人都可以幫助我。我不明白爲什麼它會發生。 我有以下sparkCluster,

1主8GbRAM

2- workerNode1 8GbRAM

3- WorkerNode2 8GbRAM

4- workerNode3 8GbRAM

和我上面簇上配置Hadoop的2.7。2 1個主3的DataNode,

如果我太平紳士在severNode給我,

24097碩士

21652 JPS

23398的NameNode

23799 ResourceManager的

23630 SecondaryNameNode

JPS上的所有的DataNodes,

8006工人

7819節點管理器

27164 JPS

7678的DataNode

通過檢查HadoopNameNode UI主:9000給我的三米現場的DataNodes

通過在主人上檢查SparkMaster Ui:7077給我三名現場工作人員

請有看這裏,

sc = SpakContext("spark://master:7077", "parser") 
-------------------------------------------- 
contains the logic of XMLParsing 
-------------------------------------------- 
and I am appending the result in one list like, 
cc_list.append([final_cll, Date,Time,int(cont[i]), float(values[i]),0]) 
Now I am Parallelizing the above cc_list like 
parallel_list = sc.parallelize(cc_list) 
parallel_list.saveAsTextFile("hdfs://master:9000/ some path") 
Now I am Doing some operations here. 
new_list = sc.textFile("hdfs://localhost:9000/some path/part-00000).map(lambda line:line.split(',')) 

result = new_list.map(lambda x: (x[0]+', '+x[3],float(x[4]))).sortByKey('true').coalesce(1) 
result = result.map(lambda x:x[0]+','+str(x[1])) 
result = result.map(lambda x: x.lstrip('[').rstrip(']').replace(' ','')).saveAsTextFile("hdfs://master:9000/some path1)) 
+0

你能分享代碼嗎?否則很難理解到底發生了什麼...... – mgaido

+0

整個解析邏輯是在Python中沒有火花轉換和行動我只用了我並行化的列表。 –

回答

1

對不起,這裏這樣的傻瓜問題。其實我發現了兩個問題

1)多個工作運行時,

parallel_list = sc.parallelize(cc_list) 

創建4-5部分文件和parallel_list與部分00000到部分00004,並且在裝載parallel_list保存在HDFS u能在代碼上面看到

new_list = sc.textFile(pathto parallel_list/part-00000) ==> so it was taking only the first part. 

2),而在運行localMode,

parallel_list = sc.parallelize(cc_list) was creating only one part file so i was able to pick whole file at one stroke. 

所以,當工人我拿出兩種解決方案

1)上運行的火花我剛添加的兼職*而從parallel_list

2創建new_list)通過增加spark.akka.frameSize 10000通過--configure spark.akka.frameSize = 1000與火花提交。