2016-01-20 92 views
1

我正在使用Flink 0.10.1的DataSet API編寫應用程序。 我可以使用Flink中的單個操作員獲得多個收集器嗎?Apache Flink:如何使用Flink DataSet API從一個數據集創建兩個數據集

我想要做的是什麼樣的東西如下:

val lines = env.readTextFile(...) 
val (out_small, out_large) = lines **someOp** { 
    (iterator, collector1, collector2) => { 
    for (line <- iterator) { 
     val (elem1, elem2) = doParsing(line) 
     collector1.collect(elem1) 
     collector2.collect(elem2) 
    } 
    } 
} 

目前我打電話mapPartition兩次從一個源數據集使兩個數據集。

val lines = env.readTextFile(...) 
val out_small = lines mapPartition { 
    (iterator, collector) => { 
    for (line <- iterator) { 
     val (elem1, elem2) = doParsing(line) 
     collector.collect(elem1) 
    } 
    } 
} 
val out_large = lines mapPartition { 
    (iterator, collector) => { 
    for (line <- iterator) { 
     val (elem1, elem2) = doParsing(line) 
     collector.collect(elem2) 
    } 
    } 
} 

由於doParsing功能是相當昂貴的,我想每行只有一次調用它。

p.s.如果你能讓我知道其他方法來以更簡單的方式做這種事情,我將非常感激。

回答

3

Flink不支持多個收集器。但是,可以通過增加一個額外的字段,其指示輸出類型更改解析步驟的輸出:

val lines = env.readTextFile(...) 
val intermediate = lines **someOp** { 
    (iterator, collector) => { 
    for (line <- iterator) { 
     val (elem1, elem2) = doParsing(line) 
     collector.collect(0, elem1) // 0 indicates small 
     collector.collect(1, elem2) // 1 indicates large 
    } 
    } 
} 

接下來消耗輸出intermediate兩次並過濾每一個用於所述第一屬性。第一個過濾器爲0第二個過濾器爲1(您也添加了一個投影來擺脫第一個屬性)。

   +---> filter("0") ---> 
       | 
intermediate --+ 
       | 
       +---> filter("1") --->