2017-05-26 92 views
1

使用Spark 2.1,我有一個函數,它需要一個DataFrame並檢查所有記錄是否在給定的數據庫(在這種情況下是Aerospike)。火花累加器計數不正確?

它看起來非常像這樣:

def check(df: DataFrame): Long = { 
    val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult") 
    df.rdd.foreachPartition(iter => { 
     val success = //if record is on the database: 1 else: 0 
     //if success = 0, send Slack message with missing record 
     finalResult.add(success) 
     } 
     df.count - finalResult.value 
    } 

所以,鬆弛的消息的數量應與該函數返回(丟失記錄的總數)的數目,但往往這種情況並非如此 - 例如,我得到一條Slack消息,但是check = 2。重新運行它提供了check = 1

任何想法發生了什麼?

回答

0

對於不同工作人員的相同數據,Spark可以多次運行一個方法,這意味着您要計算每個成功次數*在任何工作人員上處理數據的次數。因此,您可以在累加器中獲得不同通過同一數據的不同結果。

在這種情況下,您無法使用累加器來確切計數。抱歉。 :(

+0

那麼爲什麼我只收到一條Slack消息?如果它被處理了兩次,那麼我應該有兩條消息 – shakedzy

+0

嗯,對不起,不確定那麼當我沒有使用數據幀太多時,它應該是所有的在一個分區上的數據,而不是隻有一個記錄,你確定你的成功只能是1或0嗎?除此之外,我什麼也沒有 – SiLaf

+2

我認爲這是不正確的,因爲他在foreach是一個動作,火花保證累加器將被更新一次,因此估價人員應該是正確的。累加器僅在階段完成時報告,因此當在動作中運行時,即使需要重新運行,部分結果也不會影響最終值。 – puhlen