2016-09-26 108 views
1

我們有以下流。rxjs在處理數據之前檢查流是否爲空

const recorders = imongo.listCollections('recorders') 
    .flatMapConcat(names => { 
     const recorders = names 
      .map(entry => entry.name) 
      .filter(entry => !_.contains(
       ['recorders.starts', 
       'recorders.sources', 
       'system.indexes', 
       'system.users'], 
       entry)); 
     console.log(recorders); 
     return Rx.Observable.fromArray(recorders); 
    }); 

recorders.isEmpty() 
    .subscribe(
     empty => { 
      if(empty) { 
       logger.warn('No recorders found.'); 
      } 
     }, 
     () => {} 
    ); 

recorders.flatMapConcat(createRecorderIntervals) 
    .finally(() => process.exit(0)) 
    .subscribe(
     () => {}, 
     e => logger.error('Error while updating: %s', e, {}), 
     () => logger.info('Finished syncing all recorders') 
    ); 

如果流爲空,那麼我們不想createRecorderIntervals。上面這段代碼正在工作。但是,檢查流是否爲空,導致console.log被執行兩次。這是爲什麼發生?我能以某種方式修復它嗎?

編輯:所以,我去下面的方式,重新思考它感謝@馬丁的回答後

const recorders = imongo.listCollections('recorders') 
    .flatMapConcat(names => { 
     const recorders = names 
      .map(entry => entry.name) 
      .filter(entry => !_.contains(
       ['recorders.starts', 
       'recorders.sources', 
       'system.indexes', 
       'system.users'], 
       entry)); 

     if(!recorders.length) { 
      logger.warn('No recorders found.'); 
      return Rx.Observable.empty(); 
     } 

     return Rx.Observable.fromArray(recorders); 
    }) 
    .flatMapConcat(createRecorderIntervals) 
    .finally(() => scheduleNextRun()) 
    .subscribe(
     () => {}, 
     e => logger.error('Error while updating: %s', e, {}), 
     () => logger.info('Finished syncing all recorders') 
    ); 

回答

1

當你調用一個Observablesubscribe()方法它使運營商的整個產業鏈要創建它轉在您的情況下撥打imongo.listCollections('recorders')兩次。

您可以在致電flatMapConcat(createRecorderIntervals)之前插入一個操作員,檢查結果是否爲空。我心裏有特別的其中之一,但可能還有其他適合您的需求,甚至更好:

  • takeWhile() - 需要謂詞作爲參數,併發出onComplete當它返回false

那麼你的代碼將是這樣的:

const recorders = imongo.listCollections('recorders') 
    .flatMapConcat(names => { 
     ... 
     return Rx.Observable.fromArray(recorders); 
    }) 
    .takeWhile(function(result) { 
     // condition 
    }) 
    .flatMapConcat(createRecorderIntervals) 
     .finally(() => process.exit(0)) 
     .subscribe(...); 

我不知道到底是你的代碼做什麼,但我希望你的想法。

編輯:如果你想通知當整個可觀測是空的比有是方式多:

  • do()運營商和定製Observer對象。您將編寫一個自定義觀察器,並在.flatMapConcat(createRecorderIntervals)之前使用do()運算符。該對象將計算它的next回調被調用的次數,以及前面的Observable完成時,可以判斷是否至少有一個或根本沒有結果。

  • 創建ConnectableObservable。這可能與我們開始時所做的最相似。您將使用publish()運算符將您的recorders變成ConnectableObservable。然後,您可以訂閱多個觀察者而不觸發運營商鏈。當你把所有的觀察家訂閱你叫connect(),它會依次射出值所有觀察員:

    var published = recorders.publish(); 
    
    published.subscribe(createObserver('SourceA')); 
    published.subscribe(createObserver('SourceB')); 
    
    // Connect the source 
    var connection = published.connect(); 
    

    在你的情況,你會創建兩個Subjects(因爲它們作爲可觀察和觀察員在同一時間)並將其中一個與isEmpty()連鎖,第二個與flatMapConcat()連鎖。看到該文檔的詳細信息:http://reactivex.io/documentation/operators/connect.html

我覺得第一個選項實際上是對你更容易。

+0

在這種情況下,我不能檢查整個流是否爲空,因爲我沒有得到整個數組,但單獨的值,對不對? – XeniaSis

+0

我只是想在沒有錄像機的情況下顯示信息 – XeniaSis

+0

我更新了我的答案。 tl; dr查看['do()'](http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-do)運算符。 – martin