2017-03-07 78 views
0

我有一個Spark Streaming應用程序正在運行,它使用mapWithState函數來跟蹤RDD的狀態。 應用程序運行罰款幾分鐘,但然後用爲什麼火花工的內存使用量會隨着時間而增加?

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373 

崩潰我觀察到的星火應用增加了內存使用情況隨時間線性,即使我已經爲mapWithStateRDD超時。請參閱下面的代碼片段和內存使用情況 -

val completedSess = sessionLines 
        .mapWithState(StateSpec.function(trackStateFunction _) 
        .numPartitions(80) 
        .timeout(Minutes(5))) 

enter image description here

爲什麼要增加內存隨時間線性如果每個RDD明確超時?

我試圖增加內存,但沒關係。我錯過了什麼?

編輯 - 代碼參考

高清trackStateFunction(batchTime:時間,關鍵:字符串值:選項[字符串],狀態:國發[(布爾,列表[字符串]龍)]):選項[ (布爾,列表[字符串])] = {

def updateSessions(newLine: String): Option[(Boolean, List[String])] = { 
    val currentTime = System.currentTimeMillis()/1000 

    if (state.exists()) { 
     val newLines = state.get()._2 :+ newLine 

     //check if end of Session reached. 
     // if yes, remove the state and return. Else update the state 
     if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) { 
     state.remove() 
     Some(true, newLines) 
     } 
     else { 
     val newState = (false, newLines, currentTime) 
     state.update(newState) 
     Some(state.get()._1, state.get()._2) 
     } 
    } 
    else { 
     val newState = (false, List(value.get), currentTime) 
     state.update(newState) 
     Some(state.get()._1, state.get()._2) 
    } 
    } 

    value match { 
    case Some(newLine) => updateSessions(newLine) 
    case _ if state.isTimingOut() => Some(true, state.get()._2) 
    case _ => { 
     println("Not matched to any expression") 
     None 
    } 
    } 
} 
+1

您有多少傳入流量?多少RAM /磁盤?我們需要更多信息。 –

+1

另外,您有多久檢查一次? –

+1

我有一個由4名工作人員組成的集羣(8個內核,32 GB RAM,每個128 GB SSD)。來自Kinesis Stream的傳入流量爲10-15 MB/s。批處理間隔爲10秒。檢查點間隔爲60s – cmbendre

回答

1

根據mapwithstate的信息: 國家規範 初始狀態爲RDD - 你可以從一些賣場加載初始狀態,然後開始新的數據流作業與那個狀態。

分區數量 - 鍵值狀態dstream通過鍵進行分區。如果您之前對狀態的大小有很好的估計,則可以提供分區的數量來相應地對其進行分區。

分區程序 - 您還可以提供自定義分區程序。默認分區程序是散列分區程序。如果您對密鑰空間有很好的理解,那麼您可以提供一個自定義分區器,它可以執行比默認散列分區程序更高效的更新。

超時 - 這將確保其值未在特定時間段內更新的鍵將從狀態中移除。這可以幫助清理舊鍵的狀態。

所以,超時只能用一段時間後清理,而沒有更新的鍵。內存將運行完整並最終阻塞,因爲執行程序沒有分配足夠的內存。這給了MetaDataFetchFailed異常。隨着記憶力的增加,我希望你的意思是執行者。即使增加執行程序的內存可能也不起作用,因爲該流仍在繼續。使用MapWithState,會話線將包含與輸入dstream相同的記錄數。所以要解決這個問題就是讓你的dstream更小。在流媒體方面,你可以設置一個批次的時間間隔,這將最有可能解決這個

VAL SSC =新的StreamingContext(SC,秒(batchIntervalSeconds))

記得在同時做一次快照和檢查點。快照將允許您使用來自之前丟失的流的信息進行其他計算。希望這有助於瞭解更多信息,請參閱:https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlhttp://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/

相關問題