2014-11-03 77 views
3

我想通過滑動窗口將部分函數傳遞給在DStream批處理中捕獲的所有RDD的聯合。可以說,我對建設離散到1分二批流超過10秒的窗口操作:Spark:如何將PartialFunction傳遞給DStream?

val ssc = new StreamingContext(new SparkConf(), Seconds(1)) 
val stream = ssc.socketStream(...) 
val window = stream.window(Seconds(10)) 

window會的K許多RDDS。我想在這些RDD的所有K的聯合上使用collect(f: PartialFunction[T, U])。我可以使用foreachRDD致電工會運營商++,但我想返回RDD而不是Unit並避免副作用。

我正在尋找的是一個DStream

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T] 

減速,我可以使用像這樣:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc)) 

但這不是可用的星火流的API中。

有沒有人有任何好的想法,將流中捕獲的RDD合併到單個RDD中,這樣我就可以傳入部分函數了?或者爲了實施我自己的RDD減速器?也許這個功能在後續的Spark發佈中出現?

+0

計算函數將允許你在一段時間內獲得RDD。 – Anant 2014-11-03 16:44:31

+0

@Anant這個時期的開始和結束在哪裏? DStream方法'compute'只接受'validTime'參數。這是我的窗戶的開始還是結束?另外,我將如何處理與我的批次相同的間隔重複調用'compute'?我正在尋找一些不那麼有狀態的東西。 – nmurthy 2014-11-03 20:46:59

+0

@nmurthy你不能在DStream上「收集」。你能否進一步解釋你想要做什麼?還有另一種方法。 – maasg 2014-11-03 23:16:56

回答

2

部分函數不直接受DStream操作支持,但實現相同功能並不困難。

例如,讓我們舉一個小部分功能,需要一個String的生成字符串的詮釋,如果它是一個數字:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt} 

而且我們有一個字符串的DSTREAM:

val stringDStream:DStream[String] = ??? // use your stream source here 

然後,我們可以將部分函數應用於DStream,如下所示:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)