2015-10-20 42 views
-2

我想創建一個函數,稍後可以使用三個不同的RDD數據集。 函數採用鍵和值,並將其轉換到SEQ [字符串]創建spark函數,接受key,value作爲argumets並返回RDD [string]?

def ConvertToMap2(value: RDD[(String, (String,String,String,String,String,String))]): Seq[String] = { 
    value.collect().toMap.values.toSeq.map(x => x.toString.replace("(","").replace(")","")) 
} 

當我試圖通過一個數據應用設置它的確定,因爲它與6個值例如一個鍵: -

val StatusRDD=ConvertToMap(FilterDataSet("1013").map(x => ((x(5)+x(4)),(x(5),x(4),x(1),x(6),x(7),x(8))))) 

但我嘗試應用另一個數據集,我需要我們編寫函數,因爲其他數據集包含7個值,其中一個鍵用於重寫相同邏輯但名稱不同的函數。

def ConvertToMap2(value: RDD[(String,(String,String,String,String,String,String,String))]): Seq[String] = { 
    value.collect().toMap.values.toSeq.map(x => x.toString.replace("(","").replace(")","")) 
} 

val LuldRDD2=ConvertToMap2(FilterDataSet("1041").map(x => ((x(5)+x(4)),(x(5),x(4),x(1),x(6),x(7),x(8),x(9))))) 

有沒有一種方法可以爲兩者編寫一個函數,它只接受一個鍵的6或7個字符串值?還是我可以擴展我的功能?

回答

2

TupleX類從Product繼承的數據類型,所以我會這樣定義函數:

def convertToSeq(rdd: RDD[(String, Product)]): Seq[String] = { 
    rdd.values.map(x => x.productIterator.mkString).collect().toSeq 
} 

注意TupleX類有productIterator我在這裏用來創建字符串(我發現你的方式有點冗長,更難以閱讀),我也推遲collect調用,直到轉換後值,所以地圖操作是並行運行的。

最後,我更改了函數的名稱,因爲它轉換爲Seq而不是Map

0

沒錯去的答案需要使用任何

def ConvertToMap (value: RDD[(String,Any)]): Seq[String] = { 
    value.collect().toMap.values.toSeq.map(x => x.toString.replace("(","").replace(")","")) 
} 
相關問題