2016-07-26 68 views
0

拆分DSTREAM如何給給定一個匹配

val dstream = ssc.createStream(..) 

我們如何獲得分時段/分組/拆分組Dstreams從它,大意如下:

val (s1, s2, s3): (DStream[_],DStream[_],DStream[_]) = 
    dstream.map{ in match => 
    case <cond1> => bucket1Value 
    case <cond2> => bucket2Value 
    case _ => bucket3Value 
    }.<some bucketing/grouping operation> 

RE:可能重複這是一個完全不同的問題 - 另一個是關於RDD的不是DStream的!

+0

@LostInOverflow DStream與RDD的完全不同 - 你爲什麼會建議它們是相同的? – javadba

+0

@LostInOverflow如果你同意OP的評論,你可以取消你的標誌嗎? (如果再次點擊「標誌」,您將看到一個按鈕) –

+0

DStream是RDD的一個序列,每個操作都應用在RDD上,因此拆分DStream與拆分RDD是同樣的問題。如果有解決方案來拆分RDD,則有一種解決方案來拆分DStream。它對面的dir。 – 2016-07-27 09:33:42

回答

0

回答我自己的問題:但如果有人(任何人?)有建議直接執行操作,它將很樂意接受。

因此,這裏是a解決方案 - 雖然不優雅。

val s1 = dstream.flatMap{ in match => 
    case r if <cond1> => bucket1Value 
    case _   => None 
} 
val s2 = dstream.flatMap{ in match => 
    case r if <cond2> => bucket1Value 
    case _   => None 
} 
val s3 = dstream.flatMap{ in match => 
    case r if !<cond1> && !<cond2> => bucket3Value 
    case _   => None 
} 
相關問題