2016-03-01 51 views
6

我在星火新手操作(我的版本是1.6.0),現在我試圖解決如下問題:如何執行「查找」關於星火dataframes給出多個條件

假設有是兩個源文件:

  • 第一個(簡稱A)是一個大的包含名爲A1,B1,C1和其他80列的列。裏面有23萬條記錄。
  • 第二個(簡稱B)是一個小查找表,其中包含名爲A2,B2,C2和D2的列。裏面有250條記錄。

現在我們需要插入一個新的柱分爲A,下面給出的邏輯:

  • 第一查找A1,B1和(對應列是A2,B2和C2)C1在B,如果成功的,返回D2作爲新添加的列的值。如果沒有發現...
  • 然後在B中查找A1,B1。如果成功,則返回D2。如果沒有發現任何...
  • 設置默認值「NA」

我已經閱讀文件和它們改建爲數據幀。對於第一種情況,我通過將左邊的外部結合在一起得到了結果。但我在下一步找不到好的方法。

我目前的嘗試是通過使用不太嚴格的條件連接A和B來構建新的數據幀。但我不知道如何從另一個更新當前數據幀。或者還有其他更直觀,更有效的方法來解決整個問題嗎?

感謝您的所有答案。

----------------------------- 20160309更新-------------- ------------------

最後接受@mlk的答案。還是非常感謝@ zero323對UDF和加入的評論,鎢代碼的生成實際上是我們現在面臨的另一個問題。但是,因爲我們需要爲每一個查詢做查詢的分數和平均4個條件,前者的解決方案更適合...

最終的解決方案是某種看起來像下面片段:

``` 
import sqlContext.implicits._ 
import com.github.marklister.collections.io._ 

case class TableType(A: String, B: String, C: String, D: String) 
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) 
val lkupD = udf { 
    (aStr: String, bStr: String, cStr: String) => 
    tableBroadcast.value.find { 
     case TableType(a, b, c, _) => 
     (a == aStr && b == bStr && c == cStr) || 
     (a == aStr && b == bStr) 
    }.getOrElse(TableType("", "", "", "NA")).D 
} 
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) 
``` 

回答

4

由於B是小我認爲做到這一點的最佳方式是廣播變量和用戶定義的功能。

// However you get the data... 
case class BType(A2: Int, B2: Int, C2 : Int, D2 : String) 
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) 

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") 


// Broadcast B so all nodes have a copy of it. 
val Bbradcast = sc.broadcast(B) 

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {(a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } 

// Use the UDF in a select 
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+1

這可能是要走的路。我也用'連接'提供了一個替代解決方案。 – zero323

+0

謝謝mlk。如果查找表很大(500K * 50),播放它還是不錯的? –

+0

我的另一個問題是,假設我需要在不同的列上進行30次查找,並編寫50個UDF,性能是否會受到影響? –

2

只是爲了參考,而不UDF的一個解決方案:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) 
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) 

// Match A, B and C 
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") 
// Match A and B mismatch C 
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") 

val toDrop = b1.columns ++ b2.columns 

toDrop.foldLeft(a 
    .join(b1, expr1, "leftouter") 
    .join(b2, expr2, "leftouter") 
    // If there is match on A, B, C then D_1 should be not NULL 
    // otherwise we fall-back to D_2 
    .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c)) 

這假設有在每個類別至多一個匹配(所有三列,或前兩個)或重複的行中的輸出是期望。

UDF VS JOIN

有多種因素需要考慮,並沒有簡單的答案在這裏:

缺點

  • 廣播joins需要傳遞數據的兩倍到工人節點。至於broadcasted表格沒有被緩存(SPARK-3863),並且不可能在最近的將來改變(Resolution:Later)。
  • join即使有完全匹配,操作也會應用兩次。

優點

  • join​​3210和是透明的優化而UDF是不。
  • 直接使用SQL表達式操作可以受益於所有的Tungsten優化,包括代碼生成,而UDF不能。