我在星火新手操作(我的版本是1.6.0),現在我試圖解決如下問題:如何執行「查找」關於星火dataframes給出多個條件
假設有是兩個源文件:
- 第一個(簡稱A)是一個大的包含名爲A1,B1,C1和其他80列的列。裏面有23萬條記錄。
- 第二個(簡稱B)是一個小查找表,其中包含名爲A2,B2,C2和D2的列。裏面有250條記錄。
現在我們需要插入一個新的柱分爲A,下面給出的邏輯:
- 第一查找A1,B1和(對應列是A2,B2和C2)C1在B,如果成功的,返回D2作爲新添加的列的值。如果沒有發現...
- 然後在B中查找A1,B1。如果成功,則返回D2。如果沒有發現任何...
- 設置默認值「NA」
我已經閱讀文件和它們改建爲數據幀。對於第一種情況,我通過將左邊的外部結合在一起得到了結果。但我在下一步找不到好的方法。
我目前的嘗試是通過使用不太嚴格的條件連接A和B來構建新的數據幀。但我不知道如何從另一個更新當前數據幀。或者還有其他更直觀,更有效的方法來解決整個問題嗎?
感謝您的所有答案。
----------------------------- 20160309更新-------------- ------------------
最後接受@mlk的答案。還是非常感謝@ zero323對UDF和加入的評論,鎢代碼的生成實際上是我們現在面臨的另一個問題。但是,因爲我們需要爲每一個查詢做查詢的分數和平均4個條件,前者的解決方案更適合...
最終的解決方案是某種看起來像下面片段:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
這可能是要走的路。我也用'連接'提供了一個替代解決方案。 – zero323
謝謝mlk。如果查找表很大(500K * 50),播放它還是不錯的? –
我的另一個問題是,假設我需要在不同的列上進行30次查找,並編寫50個UDF,性能是否會受到影響? –