2016-08-23 227 views
1

我是Flink的新手,我使用DataSet API工作。經過大量處理作爲最後階段之後,我需要將其中一個值除以其最大值來標準化。所以,我用.max()運算符來取最大值,後來我將結果作爲構造函數的參數傳遞給MapFunction。Flink執行數據流兩次

這有效,但所有的處理都執行兩次。執行一個作業來查找最大值,然後執行另一個作業以創建最終結果(從頭開始執行)......是否有任何解決方法只能執行一次整個數據流?

final List<Tuple6<...>> maxValues = result.max(2).collect(); 
    assert maxValues.size() == 1; 
    result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...) 

@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5") 
@FunctionAnnotation.ReadFields("f2") 
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> { 

    private final Tuple6<...> maxValues; 

    public NormalizeAttributes(Tuple6<...> maxValues) { 
     this.maxValues = maxValues; 
    } 

    @Override 
    public Tuple6<...> map(Tuple6<...> value) throws Exception { 
     value.f2 /= maxValues.f2; 
     return value; 
    } 
} 

回答

0

collect()立即觸發程序的執行直到由collect()請求的數據集。如果您稍後再次撥打env.execute()collect(),則該程序將再次執行。

除了執行的副作用之外,使用collect()將值分配給後續轉換還具有數據傳輸到客戶端以及稍後返回到羣集中的缺點。 Flink提供所謂的廣播變量,將DataSet作爲側面輸入發送到另一個轉換中。在你的程序

使用廣播變量將如下所示:

DataSet maxValues = result.max(2); 
result 
    .map(new NormAttrs()).withBroadcastSet(maxValues, "maxValues") 
    .writeAsCsv(...); 

NormAttrs功能應該是這樣的:

private static class NormAttr extends RichMapFunction<Tuple6<...>, Tuple6<...>> { 

    private Tuple6<...> maxValues; 

    @Override 
    public void open(Configuration config) { 
    maxValues = (Tuple6<...>)getRuntimeContext().getBroadcastVariable("maxValues").get(1); 
    } 

    @Override 
    public PredictedLink map(Tuple6<...> value) throws Exception { 
    value.f2 /= maxValues.f2; 
    return value; 
    } 
} 

您可以找到有關在documentation廣播變量的詳細信息。

+0

非常感謝! ;) – kaser