2016-05-29 95 views
1

我試圖合併兩個流,其中一人應該是有狀態(如使用不頻繁更新的靜態數據)後:阿帕奇星火合併updateStateByKey()

SparkConf conf = new SparkConf().setAppName("Test Application").setMaster("local[*]"); 
JavaStreamingContext context = new JavaStreamingContext(conf, Durations.seconds(10)); 
context.checkpoint("."); 
JavaDStream<String> dataStream = context.socketTextStream("localhost", 9998); 
JavaDStream<String> refDataStream = context.socketTextStream("localhost", 9999); 

JavaPairDStream<String, String> pairDataStream = dataStream.mapToPair(e -> { 
    String[] tmp = e.split(" "); 
    return new Tuple2<>(tmp[0], tmp[1]); 
}); 

JavaPairDStream<String, String> pairRefDataStream = refDataStream.mapToPair(e -> { 
    String[] tmp = e.split(" "); 
    return new Tuple2<>(tmp[0], tmp[1]); 
}).updateStateByKey((Function2<List<String>, Optional<String>, Optional<String>>) (strings, stringOptional) -> { 
    if (!strings.isEmpty()) { 
     return Optional.of(strings.get(0)); 
    } 
    return Optional.absent(); 
}); 

pairDataStream.join(pairRefDataStream).print(); 


context.start(); 
context.awaitTermination(); 

當我寫1 aaa到第一個流和1 111立即進入第二,一切正常,我看到合併的結果。但是,當我在一分鐘後將1 bbb寫入第一個流時,我什麼都看不到。

我能正確理解updateStateByKey()的作用嗎?或者我錯了?

回答

3

updateStateByKey完全符合您的要求。尤其是,如果當前窗口中不包含任何數據(strings.isEmpty())你指示它忘記(返回Optional.absent();):

if (!strings.isEmpty()) { 
    return Optional.of(strings.get(0)); 
} 
return Optional.absent(); 

,而你可能想的是回到以前的狀態:

if (!strings.isEmpty()) { 
    return Optional.of(strings.get(0)); 
} 
return stringOptional;