2017-03-18 50 views
0

的補體我有兩個RDD的: 第一個(用戶ID,MOV ID,評分,時間戳)加入RDD的以導致相交

data_wo_header: RDD[String] 
scala> data_wo_header.take(5).foreach(println) 
1,2,3.5,1112486027 
1,29,3.5,1112484676 
1,32,3.5,1112484819 
1,47,3.5,1112484727 
1,50,3.5,1112484580 

和RDD2(用戶ID,MOV ID)

data_test_wo_header: RDD[String] 
scala> data_test_wo_header.take(5).foreach(println) 
1,2 
1,367 
1,1009 
1,1525 
1,1750 

我需要加入兩個RDD,這樣加入會刪除RDD1中常見的條目(UserID,Mov ID)。 有人可以指導兩個RDD的scala-spark連接。 另外,我需要一個連接,其中從RDD1派生的新RDD只有公共項目。

回答

0

首先轉換您的RDDS到數據幀數據幀,因爲有類似的API常用的SQL如加入,選擇等

要將RDDS轉換成數據幀,你需要一個RDD [行]代替RDD [字符串。

Import sqlContext.implicits._ 

case class cs1(UserID: Int, MovID: Int, Rating: String, Timestamp: String) 

case class cs2(UserID: Int, MovID: Int) 

val df1 = data_wo_header.map(row => { 
    val splits = row.split(",") 

    cs1(splits(0).toInt, splits(1).toInt, splits(2),splits(3)) 
}).toDF("UserID", "MovID", "Rating", "Timestamp") 

val df2 = data_test_wo_header.map(row => { 
    val splits = row.split(",") 

    cs2(splits(0).toInt, splits(1).toInt) 
}).toDF("UserID", "MovID") 

現在,添加一個新列DF2,

val df2Prime = df2.withColumn("isPresent", lit(1)) 

然後左加入df2Prime與DF1,並過濾掉行,其中isPresent是1,你有相交的結果。另外,臨時丟棄isPresent標誌。

val temp = df1.join(df2Prime, usingColumns = Seq("UserID", "MovID"), "left") 

temp.filter(temp("isPresent") =!= "1").drop("isPresent") 
0

超級簡單的方法是使用鍵減法。以下是爲我工作:

val data_wo_header=dropheader(data).map(_.split(",")).map(x=>((x(0),x(1)),(x(2),x(3)))) 
val data_test_wo_header=dropheader(data_test).map(_.split(",")).map(x=>((x(0),x(1)),1)) 
val ratings_train=data_wo_header.subtractByKey(data_test_wo_header) 
val ratings_test=data_wo_header.subtractByKey(ratings_train)