2017-03-16 78 views
3

我正在構建一個Angular2應用程序,並且有兩個BehaviourSubjects,我想將它們合併爲一個訂閱。我提出了兩個http請求,並且希望在兩人都回來時發起一個事件。我在看forkJoin vs combineLatest。看起來combineLatest會在任何一個behvaviorSubjects被更新時觸發,而forkJoin只會在所有behaviorsSubjects被更新後觸發。它是否正確?必須有一個普遍接受的模式,這不是嗎?如何合併多個rxjs BehaviourSubjects

編輯
這裏是我的angular2組件訂閱我的behaviorSubjects的一個示例:

export class CpmService { 

    public cpmSubject: BehaviorSubject<Cpm[]>; 

    constructor(private _http: Http) { 
     this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>()); 
    } 

    getCpm(id: number): void { 
     let params: URLSearchParams = new URLSearchParams(); 
     params.set('Id', id.toString()); 

     this._http.get('a/Url/Here', { search: params }) 
      .map(response => <Cpm>response.json()) 
      .subscribe(_cpm => { 
       this.cpmSubject.subscribe(cpmList => { 
        //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres 
        if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0)) { 
         cpmList.push(_cpm); 
         this.cpmSubject.next(cpmList); 
        } 
       }) 
      }); 
    } 
} 

這裏是我的組件的訂閱的一個片段:

this._cpmService.cpmSubject.subscribe(cpmList => { 
     doSomeWork(); 
    }); 

但是相反在單個訂閱上觸發doSomeWork()我只想在cpmSubject和fooSubject觸發時觸發doSomeWork()。

+0

一個http請求不能直接返回'BehaviorSubject' - 我猜想,你是'nexting' HTTP的響應每到一個'BehaviorSubject'或者甚至訂閱'Subject'到'get/post/put'? – olsn

+0

@olsn是的,我訂閱了http響應,並將我的主題與他們在服務類中的回覆聯繫起來 – cobolstinks

+0

行爲主題的公共訪問是反模式。請使用帶有「as Observable」的downcast的getter。所以你不能使用服務外的下一個電話 - >分離問題 –

回答

4

你可以使用zip - 運算符,它的工作原理類似於combineLatest或forkJoin,但只有當兩個流都發出觸發:http://reactivex.io/documentation/operators/zip.html

zipcombineLatest之間的區別是: 郵編將只會引發」並行「,而combineLatest將觸發任何更新併發出每個流的最新值。 因此,假設下列2個流:

streamA => 1--2--3 
streamB => 10-20-30 

zip

  • 「1,10」
  • 「2,20」
  • 「3,30」

combineLatest

  • 「1,10」
  • 「2,10」
  • 「2,20」
  • 「3,20」
  • 「3,30」

這裏也是一個活例子:

const a = new Rx.Subject(); 
 
const b = new Rx.Subject(); 
 

 
Rx.Observable.zip(a,b) 
 
    .subscribe(x => console.log("zip: " + x.join(", "))); 
 
Rx.Observable.combineLatest(a,b) 
 
    .subscribe(x => console.log("combineLatest: " + x.join(", "))); 
 

 
a.next(1); 
 
b.next(10); 
 
a.next(2); 
 
b.next(20); 
 
a.next(3); 
 
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


另一個注意事項:永遠不會訂閱訂閱。 做這樣的事情,而不是:

this._http.get('a/Url/Here', { search: params }) 
      .map(response => <Cpm>response.json()) 
      .withLatestFrom(this.cpmSubject) 
      .subscribe([_cpm, cpmList] => { 
       if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0)) { 
        cpmList.push(_cpm); 
        this.cpmSubject.next(cpmList); 
       } 
      }); 
+0

zip和combineLatest有什麼區別? – cobolstinks

+0

我已更新答案 – olsn

+0

謝謝,詳細的答案。我試圖嘗試一下,但我沒有在我的Rx.Observable對象上找到zip方法。 – cobolstinks