2017-08-01 1183 views
1

我對scala和spark非常陌生,我一直在試圖爲這個問題找到一整天的解決方案 - 它在做我的頭。我嘗試了以下代碼的20種不同變體,並在嘗試對列進行計算時不斷得到type mismatch錯誤。Spark(scala)dataframes - 檢查列中的字符串是否包含集合中的任何項目

我有一個火花數據框,我想檢查一個特定列中的每個字符串是否包含來自預定義的List(或Set)字的任何數量的字。

這裏是複製一些示例數據:

// sample data frame 
val df = Seq(
     (1, "foo"), 
     (2, "barrio"), 
     (3, "gitten"), 
     (4, "baa")).toDF("id", "words") 

// dictionary Set of words to check 
val dict = Set("foo","bar","baaad") 

現在,我想創建一個比較結果的第三列,看看內他們在$"words"列中的字符串包含任何的dict單詞組詞。所以結果應該是:

+---+-----------+-------------+ 
| id|  words| word_check| 
+---+-----------+-------------+ 
| 1|  foo|   true|  
| 2|  bario|   true| 
| 3|  gitten|  false| 
| 4|  baa|  false| 
+---+-----------+-------------+ 

首先,我想看看我是否能本身做沒有使用UDF的,因爲字典設置實際上是一個大字典> 40K字,按照我的理解這個會比UDF更高效:

df.withColumn("word_check", dict.exists(d => $"words".contains(d))) 

,但我得到的錯誤:

type mismatch; 
found : org.apache.spark.sql.Column 
required: Boolean 

我也試圖創建一個UDF做到這一點(同時使用mutable.Setmutable.WrappedArray來形容設置 - 不知道這是正確的,但既沒有工作):

val checker: ((String, scala.collection.mutable.Set[String]) => Boolean) = (col: String, array: scala.collection.mutable.Set[String]) => array.exists(d => col.contains(d)) 

val udf1 = udf(checker) 

df.withColumn("word_check", udf1($"words", dict)).show() 

卻得到另一種類型不匹配:

found : scala.collection.immutable.Set[String] 
required: org.apache.spark.sql.Column 

如果設置爲一個固定的號碼,我應該能夠使用Lit(Int)的表達式?但我不明白在一列上執行更復雜的功能,通過在scala中混合不同的數據類型。

任何幫助非常感謝,特別是如果它可以有效地完成(這是一個大於5米行df)。

回答

1

如果你的字典很大,你不應該只在你的udf中引用它,因爲整個字典是通過網絡發送的每一個任務。我會廣播你的字典與udf結合使用:

import org.apache.spark.broadcast.Broadcast 

def udf_check(words: Broadcast[scala.collection.immutable.Set[String]]) = { 
    udf {(s: String) => words.value.exists(s.contains(_))} 
} 

df.withColumn("word_check", udf_check(sparkContext.broadcast(dict))($"words")) 
5

無論效率的,這似乎工作:

df.withColumn("word_check", dict.foldLeft(lit(false))((a, b) => a || locate(b, $"words") > 0)).show 

+---+------+----------+ 
| id| words|word_check| 
+---+------+----------+ 
| 1| foo|  true| 
| 2|barrio|  true| 
| 3|gitten|  false| 
| 4| baa|  false| 
+---+------+----------+ 
+0

This Works,thank you。但就效率而言,使用40K字典列表可能會非常昂貴。 – renegademonkey

6

這裏是你如何使用UDF做到這一點:

val checkerUdf = udf { (s: String) => dict.exists(s.contains(_)) } 

df.withColumn("word_check", checkerUdf($"words")).show() 

實現流程的錯誤是,你已經創建了一個UDF需要兩個參數,這意味着在應用它時必須通過兩個Column - 但在您的DataFrame中dict不是Column,而是本地可用的。

+0

字典也可以播放它很大 –

+0

aha!這真的能夠澄清事情,並且是一個很好的答案,但是我認爲我可能不得不採用@拉斐爾羅斯的建議,因爲在這種情況下效率將非常重要。 – renegademonkey

相關問題