2017-04-22 45 views
1

私人的數據流中buySideVolumeWMA(數據流buyPressureTradeStream){如何讓Apache的弗林克滑動窗口只滑動到達窗口大小之後?

Integer windowSize = 3; 
    Integer windowslide = 1; 

    DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide) 
      .apply(new AllWindowFunction<String, Double, GlobalWindow>() { 

       @Override 
       public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out) 
         throws Exception { 
        Double buySideVolumeWMA = 0.0; 
        Integer weight = windowSize; 
        Integer numerator = 1; 

        for (String tradeString : values) { 
         JSONObject json = new JSONObject(tradeString); 
         Double tradeVolume = (Double) json.get("Volume"); 
         buySideVolumeWMA += ((tradeVolume * numerator)/weight); 
         slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator 
           + " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA); 
         numerator++; 

        } 
        numerator = 1; 

        out.collect(buySideVolumeWMA/2); 
        buySideVolumePressure = buySideVolumeWMA/2; 
        // slf4jLogger.info("buySideVolumePressure :" + 
        // buySideVolumePressure); 


    buySideVolumeWMAStream.print().setParallelism(5); 

    return buySideVolumeWMAStream; 

} 

================================= =======================================在這個程序中,我使用的3窗口大小和幻燈片尺寸1.我希望它開始滑動一旦接收流數3的數據,然後只能由1開始滑動,但發生的事情是我的程序啓動,因爲它首先接收數據立即滑動,然後將其滑入每單它接收的數據。所以如何讓它滑動後才能收到count 3的數據然後再滑動1?

+0

你的問題還不清楚。你能詳細說明你的問題嗎?你應該添加一個你得到的和你想要的東西的例子。 – ImbaBalboa

+0

@ImbaBalboa。感謝您的回覆。我在我的程序下面添加了一些關於我的問題的更多詳細信息。請您在此引導我?謝謝 – Dhinesh

回答

1

您可以添加偏移到你的窗口。這是Window命令的第三個參數。這樣你可以在我的意見後面開始。從文檔

例子:

// sliding processing-time windows offset by -8 hours 
input 
    .keyBy(<key selector>) 
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) 
    .<windowed transformation>(<window function>); 

要了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html