3

我有一個(相當大,覺得10E7行)數據幀從我基於某些屬性過濾元件分區位置

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

我的數據幀有N個分區data.rdd.getNumPartitions

現在我想知道我的行來自哪個分區。我知道我可以通過所有分區重複像這樣的東西

val temp = res.first() //or foreach, this is just an example 
data.foreachPartition(f => { 
    f.exists(row => row.get(0)==temp.get(0)) 
    //my code here 
}) //compare PKs 

data.rdd.mapPartitionsWithIndex((idx, f) => ...)

然而,這似乎過多,也不是很我的結果高性能和我的數據框變大。

在執行filter()操作後有Spark方法嗎?

或者,有沒有一種方法來重寫/一個替代filter() - 語句,以便它返回行的原點?

我也可以保存分區位置,我的數據幀和更新上重新分區,但我寧願做一個火花方式

(唯一類似的問題,我發現了here,既不問題我也發現this這可能是相似的,但不一樣)

在此先感謝您的任何幫助/指針,我很抱歉,如果我錯過了類似於我的問題已被回答。

+0

mapPartitionsWithIndex是一個簡單的地圖操作。它不涉及洗牌,只是分佈式映射。可能有另一種方式,但我不確定它可能比這更真實。 – Marie

回答

0

分區號/計數不穩定,因爲Spark將執行分區減少的自動擴展&。這意味着輸入分區計數可能與輸入文件計數不同。

一般模式在這些情況下是創建基於在每個輸入文件中的數據的一些型複合關鍵的。如果密鑰很大,可以對其進行散列以減小大小。如果您不太在乎碰撞,請使用Murmur3。如果您擔心碰撞,請使用MD5,這仍然很快。

如果只有獨特的功能,你必須是輸入文件的路徑,你就必須添加的文件路徑作爲區分列。這裏是一個辦法做到這一點:

val paths = Seq(...) 
val df = paths 
    .map { path => 
    sqlContext.read.parquet(path) 
     .withColumn("path", lit(path)) 
    } 
    .reduceLeft(_ unionAll _) 

的想法很簡單:讀取輸入文件一次一個,加上與之相關的唯一列,然後使用UNION ALL它們結合在一起。