1
使用Spark 2.1,我有一個函數,它需要一個DataFrame
並檢查所有記錄是否在給定的數據庫(在這種情況下是Aerospike)。火花累加器計數不正確?
它看起來非常像這樣:
def check(df: DataFrame): Long = {
val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult")
df.rdd.foreachPartition(iter => {
val success = //if record is on the database: 1 else: 0
//if success = 0, send Slack message with missing record
finalResult.add(success)
}
df.count - finalResult.value
}
所以,鬆弛的消息的數量應與該函數返回(丟失記錄的總數)的數目,但往往這種情況並非如此 - 例如,我得到一條Slack消息,但是check = 2
。重新運行它提供了check = 1
。
任何想法發生了什麼?
那麼爲什麼我只收到一條Slack消息?如果它被處理了兩次,那麼我應該有兩條消息 – shakedzy
嗯,對不起,不確定那麼當我沒有使用數據幀太多時,它應該是所有的在一個分區上的數據,而不是隻有一個記錄,你確定你的成功只能是1或0嗎?除此之外,我什麼也沒有 – SiLaf
我認爲這是不正確的,因爲他在foreach是一個動作,火花保證累加器將被更新一次,因此估價人員應該是正確的。累加器僅在階段完成時報告,因此當在動作中運行時,即使需要重新運行,部分結果也不會影響最終值。 – puhlen