1
私人的數據流中buySideVolumeWMA(數據流buyPressureTradeStream){如何讓Apache的弗林克滑動窗口只滑動到達窗口大小之後?
Integer windowSize = 3;
Integer windowslide = 1;
DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide)
.apply(new AllWindowFunction<String, Double, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out)
throws Exception {
Double buySideVolumeWMA = 0.0;
Integer weight = windowSize;
Integer numerator = 1;
for (String tradeString : values) {
JSONObject json = new JSONObject(tradeString);
Double tradeVolume = (Double) json.get("Volume");
buySideVolumeWMA += ((tradeVolume * numerator)/weight);
slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator
+ " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA);
numerator++;
}
numerator = 1;
out.collect(buySideVolumeWMA/2);
buySideVolumePressure = buySideVolumeWMA/2;
// slf4jLogger.info("buySideVolumePressure :" +
// buySideVolumePressure);
buySideVolumeWMAStream.print().setParallelism(5);
return buySideVolumeWMAStream;
}
================================= =======================================在這個程序中,我使用的3窗口大小和幻燈片尺寸1.我希望它開始滑動一旦接收流數3的數據,然後只能由1開始滑動,但發生的事情是我的程序啓動,因爲它首先接收數據立即滑動,然後將其滑入每單它接收的數據。所以如何讓它滑動後才能收到count 3的數據然後再滑動1?
你的問題還不清楚。你能詳細說明你的問題嗎?你應該添加一個你得到的和你想要的東西的例子。 – ImbaBalboa
@ImbaBalboa。感謝您的回覆。我在我的程序下面添加了一些關於我的問題的更多詳細信息。請您在此引導我?謝謝 – Dhinesh