2016-03-07 61 views
0

假設之間有2個RDDSpyspark我怎麼加兩個RDDS用相同的密鑰匹配

其中RDD1 has (key1,key2,value)

RDD2 has (key1, value)

現在我想結合的操作(如+或減)從RDD2到RDD1集key1的地方有一個比賽 這裏例如

RDD1 has [1,1,3],[1,2,2],[2,2,5] 

RDD2 = sc.parallelize([1,1]) 

我想導致

RDD3 to [1,1,4],[1,2,3],[2,2,5] only the first and second data was added while third one wasn't 

我嘗試使用左外連接到找到key1的比賽,並做一些操作,但我會失去那些不需要做手術,有沒有辦法做到在部分數據操作中的數據?

+0

你能澄清一下左外連接的問題嗎? –

回答

1

假設你想配對操作,或者你的數據包含1至0..1關係中,你可以做最簡單的事情是雙方RDDS轉換爲DataFrames

from pyspark.sql.functions import coalesce, lit 

df1 = sc.parallelize([ 
    (1, 1, 3), (1, 2, 2), (2, 2, 5) 
]).toDF(("key1", "key2", "value")) 

df2 = sc.parallelize([(1, 1)]).toDF(("key1", "value")) 

new_value = (
    df1["value"] + # Old value 
    coalesce(df2["value"], lit(0)) # If no match (NULL) take 0 
).alias("value") # Set alias 

df1.join(df2, ["key1"], "leftouter").select("key1", "key2", new_value) 

您可以輕鬆地調整這種通過處理其他場景在加入DataFrames之前在df2上應用聚合。

相關問題