2016-04-26 354 views
3

我正在練習Spark殼中的排序。我有一個約10列/變量的rdd。我想在第7欄如何拆分Spark rdd Array [(String,Array [String])]?

rdd 
org.apache.spark.rdd.RDD[Array[String]] = ... 

的值全RDD從我所收集做到這一點的方法是使用sortByKey,這反過來僅適用於對排序。所以我映射,所以我想有一雙由column7(字符串值)的和完整的原始RDD(字符串數組)

rdd2 = rdd.map(c => (c(7),c)) 
rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] = ... 

我再申請sortByKey,仍然沒有問題... ...

rdd3 = rdd2.sortByKey() 
rdd3: org.apache.spark.rdd.RDD[(String, Array[String])] = ... 

但是現在如何從rdd3(Array [String])拆分,收集並保存已排序的原始rdd?每當我嘗試拆分rdd3它給我一個錯誤:

val rdd4 = rdd3.map(_.split(',')(2)) 
<console>:33: error: value split is not a member of (String, Array[String]) 

我在做什麼錯在這裏?是否有其他更好的方法來對其某一列上的rdd進行排序?

+1

正是你想要的我不明白。你的意思是你想分割Array [String]中的每個字符串? – jtitusj

+2

你試圖拆分Tuple,這就是爲什麼它是錯誤的原因 –

+1

@John不,我想拆分rdd3(一對已排序的第7列和原始rdd),所以我會將原始rdd返回,但仍然按第7列排序...而沒有真正將第7列作爲前綴(如rdd3中)。我稍微編輯了這個問題,現在更清楚了嗎? –

回答

2

rdd2 = rdd.map(c => (c(7),c))做了什麼是映射它到一個元組。 rdd2: org.apache.spark.rdd.RDD[(String, Array[String])] 就像它說的:)。 現在如果你想分割記錄,你需要從這個元組中獲得它。 你可以再次映射,只取元組的第二部分(這是數組的數組[字符串] ...),像這樣:rdd3.map(_._2)

,但我強烈建議使用try rdd.sortBy(_(7))或這類東西。這樣你就不需要用元組等這樣的麻煩了。

+0

我試過了你對rdd.sortBy(_。=> _。7)的建議,但是卻發現「error:identifier expected but'=>'found」。你可以編輯,所以我可以接受你的答案?正如你所建議的那樣,rdd3.map(_._ 2)也完成了這項工作,但需要更多的工作。保留'_'(下劃線...) –

+0

'因爲rdd中的元素有一個數組結構,所以t和'.sortBy(_._ 7)'一樣工作。 @KoenDeCouck,我發佈了我的答案。你可能想檢查一下。 :) – jtitusj

+0

它沒有,然而@約翰提圖斯Jungao的答案有解決方案:rdd.sortBy(_(7))。我會接受這個答案,因爲這個問題集中在分裂問題上,並且你提供了一些關於爲什麼不起作用的信息。 –

0

我還以爲你不熟悉斯卡拉, 所以,下面應該幫助您瞭解更多,

rdd3.map(kv => { 
    println(kv._1) // This represent String 
    println(kv._2) // This represent Array[String] 
}) 
1

只是這樣做:

val rdd4 = rdd3.map(_._2) 
2

,如果你想使用7號字符串數組排序的RDD,你可以做到這一點直接

rdd.sortBy(_(6)) // array starts at 0 not 1 

rdd.sortBy(arr => arr(6)) 

,將節省您做的所有的麻煩多重轉換。之所以rdd.sortBy(_._7)rdd.sortBy(x => x._7)不會工作是因爲這不是你如何訪問數組中的元素。要訪問數組的第7個元素,說arr,你應該做arr(6)

爲了測試這一點,我做了以下內容:

val rdd = sc.parallelize(Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd"))) 

// I want to sort it using the 3rd String 
val sorted_rdd = rdd.sortBy(_(2)) 

這裏的結果:

Array(Array("ard", "bas", "wer"), Array("csg", "dip", "hwd"), Array("asg", "qtw", "hasd")) 
+0

謝謝約翰!這種解決方案看起來是更好的分類方法。我會接受Zahiro的答案,但是由於問題的措辭方式,附上您的解決方案。 (Upvoted this) –

+0

我upvoted too .. :) –