2016-05-12 66 views
2

Spark Streaming中的DStream中的每個微批處理結束時是否可以執行某些操作?我的目標是計算Spark處理的事件的數量。 Spark Streaming給了我一些數字,但平均值似乎也總結爲零值(因爲一些微批是空的)。在Spark Streaming中的微批次結束之前執行操作

例如我確實收集了一些統計數據並希望將它們發送到我的服務器,但收集數據的對象僅在某個批處理中存在,並且將從頭開始初始化以用於下一批處理。我希望能夠在完成批處理和對象消失之前調用我的「完成」方法。否則,我會丟失尚未發送到服務器的數據。

+0

你有什麼不爲你工作的一些代碼的例子嗎? – maasg

+0

這有點難以解釋。我們使用我們以前用java編寫的代碼。它嵌入在map-function中。我們的運營商收集性能數據並將其發送到我們的服務器正在每個新批次重新初始化。在我們的操作員被「殺死」之前,能夠將數據發送到我們的服務器將是一件好事。 – chAlexey

回答

0
+0

這似乎是相當不錯的方向。我肯定會在週末嘗試一下。 :) – chAlexey

+0

這是一個很好的建議,但是:這樣一個監聽器是由驅動程序初始化的。由於我的代碼在某個執行器上執行,我需要在執行器上調用我的「完成」功能。通過這種方式,我沒有收到批量完成等事件的任何更新。你知道任何可能的解決方法嗎? – chAlexey