2015-04-06 64 views
2

是否可以取消未來的火花並且仍然可以通過處理後的元素獲得更小的RDD?Spark異步接口的部分結果?

星火異步操作「記錄」在這裏

http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.rdd.AsyncRDDActions

而未來本身具有豐富的功能集

http://spark.apache.org/docs/1.2.1/api/scala/index.html#org.apache.spark.FutureAction

使用情況下,我想的就是有一個非常大的地圖,可以在計算30分鐘後中止,並且仍然收集 - 甚至是迭代或saveAsObjectFile - 已有效映射的RDD的子集。

回答

2

FutureAction.cancel會導致失敗(請參閱comment in JobWaiter.scala),因此您無法使用它來獲取部分結果。我認爲沒有辦法通過異步API來完成。

相反,您可能會在30分鐘後停止處理輸入。

val stopTime = System.currentTimeMillis + 30 * 60 * 1000 // 30 minutes from now. 
rdd.mapPartitions { partition => 
    if (System.currentTimeMillis < stopTime) partition.map { 
    // Process it like usual. 
    ??? 
    } else { 
    // Time's up. Don't process anything. 
    Iterator() 
    } 
} 

請記住,一旦所有的洗牌依存關係完成,這隻會產生變化。 (即使30分鐘過去,也不能停止洗牌。)

+0

這完全沒有經過測試。讓我知道它是否有效! –

+1

:-)我想所有的異步函數也是未經測試的,至少在生產站點。 – arivero