0

我想弄清楚如何限制會話的最大長度。在做觸發時。我現在的觸發看起來是這樣的:使用觸發器限制窗口的最大長度

return AfterEach.inOrder(
       // speculatively trigger 
       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval).orFinally(AfterWatermark.pastEndOfWindow())), 
       // finally trigger for late 
       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval))); 

這工作得很好,因爲它觸發每earlyFiringInterval時間單位,直到水印經過窗口的末尾,然後它會觸發每lateFiringInterval時間單位不止於此。

不幸的是,會話可能會持續數天,這會導致窗口長時間保持打開狀態,並導致水印滯留。我想建立一個觸發器,它可以「切割」的窗口,這樣可以:

  • 沒有會話可以比一些maxSessionLength時間(事件時間)更長。
  • 或者,將會話限制在窗格中的某些maxSessionLength事件數量。 - 這是在積累模式下運作的。 (不理想)

所以,到目前爲止,我有:

return AfterEach.inOrder(
       Repeatedly 
         // speculatively trigger at every 'earlyFiringInterval' 
         .forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval) 
         // terminate trigger when any of the following conditions are met: 
         // * We have collected either 'maxEventCount' events in the pane 
         // * Watermark has passed the window 
         .orFinally(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventCount), AfterWatermark.pastEndOfWindow()))), 
       Repeatedly 
         // trigger for late data at every 'lateFiringInterval' 
         .forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval))) 
         .orFinally(AfterPane.elementCountAtLeast(maxEventCount)); 

我想知道如果這是要走或者有更好的方法可以做到「限制窗口大小」的方式。

回答

3

您可以允許水印通過指定前進,同時保持你的會話完全保真的OutputTimeFn像這樣:

Window.into(Sessions.withGapDuration(...)) 
     .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()) 

就像一個CombineFn決定是從分組輸出轉換(我們可以將GroupByKey視爲經由級聯組合),OutputTimeFn確定分組變換的輸出的時間戳

的SDK提供了一些常見的選擇:

  • OutputTimeFns.outputAtEndOfWindow()
  • OutputTimeFns.outputAtEarliestInputTimestamp()
  • OutputTimeFns.outputAtLatestInputTimestamp()

默認今天是outputAtEarliestInputTimestamp(),它允許在什麼時間戳可以方面最大的靈活性適用於下游生產的元素,但不幸的是以維持水的(必要的)成本標記。

如果您不打算在窗口內的時間戳上明確輸出,則選擇outputAtEndOfWindow()將允許水印儘可能快地前進。

:此功能被標記Experimental。這意味着它的API可能會改變(例如,而不是接受任意的OutputTimeFn實現,它可能僅限於幾個固定的常量)。這個概念幾乎肯定會保留下來,因爲我們總是需要決定分組轉換輸出的時間戳。

如果您仍然希望因爲其他原因而將會話截斷,請進行評論,然後我將詳細說明其他選項。

順便說一句,我強烈建議簡化觸發語法我們現在提供:

AfterWatermark.pastEndOfWindow() 
    .withEarlyFirings(
     AfterProcessingTime.pastFirstElementInPane() 
      .plusDelayOf(earlyFiringInterval)) 
    .withLateFirings(
     AfterProcessingTime.pastFirstElementInPane() 
      .plusDelayOf(lateFiringInterval)) 
+0

你的意思是說,'AfterEach.repeatedly(...)'觸發的問題和一個你在帖子末尾提到的內容是否等同? –

+1

我確實希望通過'outputTimeFns'(儘管有用)選項切斷會話,但這並不能幫助我找出一些退化情況,其中一些會話可能持續多天(通常是客戶端錯誤)的會話可能會持續水印回來幾天。 –

+0

謝謝。我會嘗試一下。有沒有辦法根據最大窗口/會話持續時間進行限制? –