我試圖運行在蜂巢查詢:阿帕奇星火 - 蜂巢內部聯接,LIMIT和定製UDF
這裏是最簡單的設置(我知道我可以做一個=,但使用自定義UDF它做更多的IM不僅僅是一個相等比較)
數據集a和b是約30,000行,每行
SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5
其中custom_UDF_Equals_Comparison簡單地做a.id = b.id
之間的相等性檢查當我運行這個查詢時,我可以在我的日誌輸出中看到很多m/r任務正在運行,假設它在兩個數據集之間進行比較,直到比較所有可能的排列,並且遠高於5的極限(我會因爲我知道大多數數據可以在每個表的前幾行中加入,所以只需要少量m/r任務),爲什麼會發生這種情況?和/或我該如何解決?
編輯:
喜zero323,這是一個類似的問題,但不完全一樣,它解釋了爲什麼使用UDF進行比較時執行2之間RDD的一個全面的比較,但它不解釋爲什麼限制不停止比較時發現5的限制。例如,如果在前10次加入嘗試中找到5行,爲什麼還會進行剩餘的30,000 * 30,000次嘗試。是否由於在所有連接發生後都應用限制的事實?例如它加入30,000 * 30,000行,然後將它們減少到5?
EDIT2:
def levenshtein(str1: String, str2: String): Int = {
val lenStr1 = str1.length
val lenStr2 = str2.length
val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1)
for (i <- 0 to lenStr1) d(i)(0) = i
for (j <- 0 to lenStr2) d(0)(j) = j
for (i <- 1 to lenStr1; j <- 1 to lenStr2) {
val cost = if (str1(i - 1) == str2(j-1)) 0 else 1
d(i)(j) = min(
d(i-1)(j ) + 1, // deletion
d(i )(j-1) + 1, // insertion
d(i-1)(j-1) + cost // substitution
)
}
d(lenStr1)(lenStr2)
}
def min(nums: Int*): Int = nums.min
def join_views(joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = {
if (joinType == "Equals") {
if (col1 == null || col2 == null) {
return false
}
return col1 == col2
}
else if (joinType == "Fuzzy_String") {
if (col1 == null || col2 == null) {
return false
}
val val1 = col1.asInstanceOf[String]
val val2 = col2.asInstanceOf[String]
val ratio = Utils.distancePercentage(val1, val2)
if (ratio == 1.0) {
return val1 == val2
}
return (ratio >= parameters.asInstanceOf[Double])
}
return false;
}
... ON join_views( 「Fuzzy_String」, 「0.1」,a.col1,b.col1)LIMIT 5 = 20secs
... ON join_views(「Fuzzy_String」,「0.9」,a.col1,b.col1)LIMIT 5 = 100secs
能依然關閉,感謝您的幫助 –
我居然發現了一些令人費解的,我custom_UDF也做了模糊檢查,當我運行與0.1模糊值必須匹配,它非常快速的回報加入結果(例如,它匹配到5行非常快並返回),當我將它設置爲0.9必須匹配它需要類似於UDF中的原始=比較。我想知道爲什麼0.1 DOES的模糊匹配回報更快?不一定是個問題,只是一個觀察 –
這實際上很有趣。你能分享一些實施細節嗎? – zero323