2015-10-19 89 views
0

我想在spark env中使用scala讀取avro數據。我的數據沒有得到分配,而在運行它時只會到2個節點。我們有20多個節點。這是我的代碼段spark中的數據並行:從hdfs讀取avro數據

@serializable情況下類My_Class(VAL添加my_id:字符串)

VAL文件路徑= 「HDFS://路徑」;

val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](filePath) 

val rddprsid = avroRDD.map(A => new My_Class(new String(A._1.datum.get("My_ID").toString()))); 

val uploadFilter = rddprsid.filter(E => E.My_ID ne null); 
val as = uploadFilter.distinct(100).count; 

我不能在rdd上使用並行操作,因爲它會抱怨以下錯誤。 :30:錯誤:類型不匹配; 找到:org.apache.spark.rdd.RDD [(org.apache.avro.mapred.AvroWrapper [org.apache.avro.generic.GenericRecord],org.apache.hadoop.io.NullWritable]] required:Seq [?]

有人可以幫忙嗎?

+0

這是兩個不同的問題,其中第一個沒有足夠的信息。你的火花提交是什麼? –

+0

我使用scala控制檯提交作業,它在紗線客戶端上。 – Akhtar

+0

scala控制檯,你的意思是火星殼? –

回答

0

因爲紗線提交默認爲2。你需要與--num-executors [NUMBER]提交和可選--executor-cores [NUMBER]

至於並行化....你的數據已經被並行化您所看到的只有2個節點......因此,圍繞RDD包裝。 parallelize僅用於跨羣集採用內存數據。

+0

感謝它的工作。 – Akhtar

+0

嗨@Justin Pihony,我試圖讓明智的獨特用戶數,我的程序給了我正確的結果,如果它是單個部分文件。當我爲10個文件運行它時,我相信它會計算每個零件文件的UU並將其修正。你能幫忙嗎? val appMap = avroRDD.map(A =>(new App_userid(A._1.datum.get(「App」)。toString,A._1.datum.get(「user」)。asInstanceOf [Long]),1) ); \t val dist = appMap.reduceByKey(_ + _,500) \t val newmap = dist.map(a =>(a._1.App,1)); \t val newval = newmap.reduceByKey(_ + _); \t val finalResult = newval.collect; – Akhtar

+0

這應該是一個單獨的問題。我原來的回答是處理你原來的問題。刪除接受的支票,因爲你有一個新問題不是如何工作。 –