2016-10-02 91 views
1

我有兩個數據集,每個數據集都有兩個元素。 下面是一些例子。如何通過scala中的鍵加入兩個數據集spark

數據1:(名稱,動物)

('abc,def', 'monkey(1)') 
('df,gh', 'zebra') 
... 

數據2:(姓名,水果)

('a,efg', 'apple') 
('abc,def', 'banana(1)') 
... 

結果預期:(姓名,動物,水果)

('abc,def', 'monkey(1)', 'banana(1)') 
... 

我想要通過使用第一列「名稱」來加入這兩個數據集。我試圖做幾個小時,但我無法弄清楚。誰能幫我?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]") 
val sc = new SparkContext(sparkConf) 
val text1 = sc.textFile(args(0)) 
val text2 = sc.textFile(args(1)) 

val joined = text1.join(text2) 

上面的代碼不工作!

+0

你在哪裏拆分輸入文本到'(鍵,值)'元組? – maasg

+0

你會得到什麼樣的錯誤?它告訴你什麼? – maasg

+0

@maasg它說''無法解析符號連接。' – tobby

回答

1

join上對RDDS定義的,即,類型的RDDS結果RDD[(K,V)]。 需要的第一步是將輸入數據轉換爲正確的類型。

我們首先需要String類型的原始數據轉化爲對(Key, Value)

val parse:String => (String, String) = s => { 
    val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r 
    s match { 
    case regex(k,v) => (k,v) 
    case _ => ("","") 
    } 
} 

(請注意,由於鍵包含逗號,我們不能用一個簡單的split(",")表達式)

然後我們使用該函數來解析文本輸入數據:

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')") 
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')") 

val rdd1 = sparkContext.parallelize(s1) 
val rdd2 = sparkContext.parallelize(s2) 

val kvRdd1 = rdd1.map(parse) 
val kvRdd2 = rdd2.map(parse) 

最後,我們使用join方法來連接兩個RDDS

val joined = kvRdd1.join(kvRdd2) 

//讓我們看看結果

joined.collect 

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1)))) 
+0

非常感謝! – tobby

+0

我還有一個問題。我怎樣才能在數據中保留單引號? – tobby

+1

@tobby更改正則表達式以保留引號。 – maasg

0

您必須首先爲您的數據集創建pairRDD,然後您必須應用連接轉換。您的數據集看起來不準確。

請考慮下面的例子。

**Dataset1** 

a 1 
b 2 
c 3 

**Dataset2** 

a 8 
b 4 

您的代碼應該像下面Scala中

val pairRDD1 = sc.textFile("/path_to_yourfile/first.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val pairRDD2 = sc.textFile("/path_to_yourfile/second.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val joinRDD = pairRDD1.join(pairRDD2) 

    joinRDD.collect 

下面是從階殼

res10: Array[(String, (String, String))] = Array((a,(1,8)), (b,(2,4))) 
相關問題