1
我是Flink的新手,我使用DataSet API工作。經過大量處理作爲最後階段之後,我需要將其中一個值除以其最大值來標準化。所以,我用.max()
運算符來取最大值,後來我將結果作爲構造函數的參數傳遞給MapFunction。Flink執行數據流兩次
這有效,但所有的處理都執行兩次。執行一個作業來查找最大值,然後執行另一個作業以創建最終結果(從頭開始執行)......是否有任何解決方法只能執行一次整個數據流?
final List<Tuple6<...>> maxValues = result.max(2).collect();
assert maxValues.size() == 1;
result.map(new NormalizeAttributes(maxValues.get(0))).writeAsCsv(...)
@FunctionAnnotation.ForwardedFields("f0; f1; f3; f4; f5")
@FunctionAnnotation.ReadFields("f2")
private static class NormalizeAttributes implements MapFunction<Tuple6<...>, Tuple6<...>> {
private final Tuple6<...> maxValues;
public NormalizeAttributes(Tuple6<...> maxValues) {
this.maxValues = maxValues;
}
@Override
public Tuple6<...> map(Tuple6<...> value) throws Exception {
value.f2 /= maxValues.f2;
return value;
}
}
非常感謝! ;) – kaser