2016-08-24 109 views
2

我試圖運行在蜂巢查詢:阿帕奇星火 - 蜂巢內部聯接,LIMIT和定製UDF

這裏是最簡單的設置(我知道我可以做一個=,但使用自定義UDF它做更多的IM不僅僅是一個相等比較)

數據集a和b是約30,000行,每行

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5

其中custom_UDF_Equals_Comparison簡單地做a.id = b.id

之間的相等性檢查

當我運行這個查詢時,我可以在我的日誌輸出中看到很多m/r任務正在運行,假設它在兩個數據集之間進行比較,直到比較所有可能的排列,並且遠高於5的極限(我會因爲我知道大多數數據可以在每個表的前幾行中加入,所以只需要少量m/r任務),爲什麼會發生這種情況?和/或我該如何解決?

編輯:

喜zero323,這是一個類似的問題,但不完全一樣,它解釋了爲什麼使用UDF進行比較時執行2之間RDD的一個全面的比較,但它不解釋爲什麼限制不停止比較時發現5的限制。例如,如果在前10次加入嘗試中找到5行,爲什麼還會進行剩餘的30,000 * 30,000次嘗試。是否由於在所有連接發生後都應用限制的事實?例如它加入30,000 * 30,000行,然後將它們減少到5?

EDIT2:

def levenshtein(str1: String, str2: String): Int = { 
val lenStr1 = str1.length 
val lenStr2 = str2.length 

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1) 

for (i <- 0 to lenStr1) d(i)(0) = i 
for (j <- 0 to lenStr2) d(0)(j) = j 

for (i <- 1 to lenStr1; j <- 1 to lenStr2) { 
    val cost = if (str1(i - 1) == str2(j-1)) 0 else 1 

    d(i)(j) = min(
    d(i-1)(j ) + 1,  // deletion 
    d(i )(j-1) + 1,  // insertion 
    d(i-1)(j-1) + cost // substitution 
) 
} 

d(lenStr1)(lenStr2) 

}

def min(nums: Int*): Int = nums.min 

def join_views(joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = { 
if (joinType == "Equals") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    return col1 == col2 
} 
else if (joinType == "Fuzzy_String") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    val val1 = col1.asInstanceOf[String] 
    val val2 = col2.asInstanceOf[String] 

    val ratio = Utils.distancePercentage(val1, val2) 

    if (ratio == 1.0) { 
    return val1 == val2 
    } 

    return (ratio >= parameters.asInstanceOf[Double]) 
} 

return false; 

}

... ON join_views( 「Fuzzy_String」, 「0.1」,a.col1,b.col1)LIMIT 5 = 20secs

... ON join_views(「Fuzzy_String」,「0.9」,a.col1,b.col1)LIMIT 5 = 100secs

+0

能依然關閉,感謝您的幫助 –

+0

我居然發現了一些令人費解的,我custom_UDF也做了模糊檢查,當我運行與0.1模糊值必須匹配,它非常快速的回報加入結果(例如,它匹配到5行非常快並返回),當我將它設置爲0.9必須匹配它需要類似於UDF中的原始=比較。我想知道爲什麼0.1 DOES的模糊匹配回報更快?不一定是個問題,只是一個觀察 –

+0

這實際上很有趣。你能分享一些實施細節嗎? – zero323

回答

1

所以這裏有三個不同的問題:

  • 星火優化利用散列和排序連接,以便這些優化僅適用於同等聯接。其他類型的連接,包括取決於UDF的連接需要成對比較,因此需要笛卡爾積。詳情請參閱Why using a UDF in a SQL query leads to cartesian product?
  • 數據移動後的限制操作,特別是混洗,無法完全優化。您可以在Sun Rui提供的nice answerTowards limiting the big RDD中找到一個很好的解釋。

    由於缺乏隨機播放,您的情況相當簡單,但您仍然必須將分區放在一起。

  • 限制優化基於分區而不是記錄。 Spark開始檢查第一個分區,並且如果滿足條件的元素數量少於所需的數量,它會迭代每次迭代所需的分區數量增加(據我記得的因子是4)。如果你正在尋找一個罕見的事件,這可以增加很快。