2017-10-21 112 views
0

我有一個(費時)計算,類似的東西:如何跳過可觀測量的項目,而組合使用RxJS與流逝的定時器處理

async calculation(input: MyInputType): Promise<MyOutputType> { 
    // fetch some infos from an HTTP endpoint 
    // calculate result by combining received infos with input 
    return result; // Promise which resolves after all calculation is complete 
} 

計算所用的輸入經由來自另一個部件一個可觀察的(BehaviorSubject),它發出的物品有時比他們可以處理的速度快。因此,輸出依賴於http端點的可觀察信息和獲取的信息,這些信息也可能隨時發生變化。

重要:只有最後計算結果是我的應用程序相關,必須在任何時間的變量可用。
我的目標是使用RxJS和其反應性運營商設置一個可觀察到的鏈具有以下行爲:從所述源BehaviorSubject

  • 物品發射而計算當前正在處理應該被忽略(backpressuring防止DoS)但是...
  • 當前計算完成或發生超時時,如果有新的計算項目,則必須直接爲最近的輸入項目啓動新的計算。
  • 如果在至少5分鐘內沒有開始計算,則必須再次使用最近的輸入項目開始新的計算。 (這對更新即將到來的http信息很重要)

有沒有人有任何想法如何解決這個問題?
(我使用的是RxJS 5,TypeScript,Node.js,但也歡迎其他語言的反應式解決方案。)

+0

我認爲這個問題相當接近你想要達到的目標。 https://stackoverflow.com/q/46785128/482868 –

+1

如果計算時間超過5分鐘,會發生什麼情況?是否還有新的計算開始,如果有的話,舊計算會發生什麼? – concat

+0

好問題,但是由於我在第二個項目符號中提到的超時,所以不應該發生這種情況。這個超時(計算本身)當然少於5分鐘。因此在這種情況下可以放棄結果。 – Niehno

回答

0

從我的立場來看,有兩個單獨的問題。首先是如何在5分鐘內計算已經計算的物品。我們首先填充空間大於5分鐘的克隆重複的最後一個值。克隆將在稍後重要。

// inputs$: Observable<MyInputType> 
const spacefilled_inputs$ = inputs$.switchMap(input => Observable.interval(5000).map(() => Object.assign({}, input))) 
            .merge(input$); 

第二個問題是棘手:如何觸發calculation兩個先前計算(如果有一個)完成後,當一個新的輸入從spacefilled_inputs$發射時?公平的警告:RxJS自然不會這麼做。我先放下了一些代碼,並解釋後:

const promise_subject = new Subject<Promise<MyOutputType>>(); 
const results$ = promise_subject.asObservable() 
           .flatMap(promise => Observable.fromPromise(promise)); 
Observable.combineLatest(spacefilled_inputs$, results$) 
      .pairwise() 
      .scan(([inputs_changed, results_changed], [prev_pair, next_pair]) => { 
      // assumes that both MyInputType and MyOutputType are non-primitive objects 
      inputs_changed = inputs_changed || prev_pair[0] !== next_pair[0]; 
      results_changed = results_changed || prev_pair[1] !== next_pair[1]; 

      if(inputs_changed && results_changed) { 
       promise_subject.next(calculation(next_pair[1])); 
       return [false, false]; 
      } 
      return [inputs_changed, results_changed]; 
      }, [true, true]); 

最重要的,我們將通過受流的計算結果,使他們能信號進一步計算。這裏的關鍵是pairwise,它發出前一個和後來的元素的幀,所以我們可以比較它們。如果輸入和結果都發生了變化,則scan的主體會比較並保持狀態。

注意檢查更改對對象引用使用嚴格的等式。如果MyOutputTypeMyInputType是原語,則會錯誤地篩選出重複值。你需要做一些類似於輸入和結果流的計數器。

+0

非常感謝您的回覆! 第一部分:我很喜歡這個解決方案。我只是想知道爲什麼'Object.assign'是必要的。 (我的輸入對象可能很大)。調用mapTo是不夠的嗎? (輸入=> Observable.interval(5000).mapTo(輸入))。合併(輸入$)' 第二部分:我不得不承認,我還沒有理解你的代碼100% 。解決方案似乎對我來說比較複雜。當我更好地理解你的代碼時,我會再次報告。第2部分更容易的解決方案是受歡迎的。 – Niehno

+0

@Niehno如果'scan'檢測到上述變化,那麼如果沒有克隆,計算將不會每5分鐘重試一次,因爲它會得到相同的對象。我同意,這不是最高性能的解決方案,因此您可以將計數器與兩個流相關聯,但不幸的是以第2部分的複雜性爲代價。 – concat