2017-08-23 38 views
0

我有一個pyspark RDD(myRDD),該ID的可變長度列表,如pyspark喂一個RDD到另一個使用「在」子句

[['a', 'b', 'c'], ['d','f'], ['g', 'h', 'i','j']] 

我有一個pyspark數據幀(myDF)列IDvalue

我想查詢myDF與查詢:

outputDF = myDF.select(F.collect_set("value")).alias("my_values").where(col("ID").isin(id_list)) 

其中id_list距離myRDD的元素,如[ 'd', 'F']或[ 'A', 'B', 'C']。

一個例子是:

outputDF = myDF.select(F.collect_set("value")).alias("my_values").where(col("ID").isin(['d','f'])) 

什麼是使用RDD查詢DF這樣的並行方式?

回答

0

考慮到您的數據幀列「ID」類型爲stringType(),您希望保留出現在任何RDD行中的ID值。

首先,讓我們將我們的RDD成一列數據框與每一行的唯一ID:

from pyspark.sql import HiveContext 
hc = HiveContext(sc) 
ID_df = hc.createDataFrame(
    myRDD.map(lambda row: [row]), 
    ['ID'] 
).withColumn("uniqueID", psf.monotonically_increasing_id()) 

我們將explose它使每一行只有一個ID值:

import pyspark.sql.functions as psf 
ID_df = ID_df.withColumn('ID', psf.explode(ID_df.ID)) 

我們現在可以加入了原來的數據框,內部聯接將作爲一個過濾器:

myDF = myDF.join(ID_df, "ID", "inner) 

一個collect_set是一個聚合函數,所以你需要這麼一種groupBy使用它,例如通過新創建的行ID之前:

myDF.groupBy("uniqueID").agg(
    psf.collect_set("ID").alias("ID") 
) 
+0

謝謝,@Marie。這有一個例外:如果我餵它[['a','b','c'],['d','f'],我得到元素['a', 'b','c']和元素['d','f']。我在考慮使用「in」子句的原因是因爲它只會返回該特定元素的匹配值。希望創建一個數據幀,如[(['a','b','c'],('v1','v2','v3','v4','v5')],其中v1到v5是所有與ID a,b和c相匹配的值 – Eka

+0

您可以爲每行創建一個唯一的ID以便稍後識別它們我已編輯我的答案 – MaFF

+0

再次感謝@Marie!我改變了最後一行到DF.groupBy(「uniqueID」)。agg(psf.collect_set(「ID」).alias(「ID」),psf.collect_set(「value」).alias(「values」))。drop(「uniqueID」 ),以獲得我想要的表格的最終版本。感謝您的幫助! – Eka