2017-02-04 103 views
1

我有一個Observable,我正在使用它將一個承諾轉換爲訂閱。這導致了我需要迭代的集合來調用每個元素上的HTTP服務。我使用forkJoin等待所有這些調用完成,以便我可以做其他事情,但不幸的是,我的訂閱沒有被調用。你看到我在這裏失蹤了嗎?可觀察訂閱沒有被調用

Observable.fromPromise(this.users.getElements()).subscribe(results => { 
    Observable.forkJoin(
    results.map(
     aUser => this.HttpService.submitUser(aUser).subscribe(
     results => { 
      this.progress += 1; 
     }, 
     err => { 
      this.progress += 1; 
      this.handleError(<any>err); 
     }) 
    ).subscribe(
     //it never gets to either of these calls after all service calls complete 
     data => { 
     debugger; 
     console.log(data); 
     this.reset(); 
     }, 
     err => { 
     debugger; 
     console.log(err); 
     this.reset(); 
     } 
    )); 
}); 

回答

2

有一件事是你不要訂閱每個Observable傳遞給forkJoin()。運營商必須自己做。

如果您想在每個Observable完成時收到通知,您可以使用.do(undefined, undefined,() => {...})

let observables = [ 
    Observable.of(42).do(undefined, undefined,() => console.log('done')), 
    Observable.of('a').delay(100).do(undefined, undefined,() => console.log('done')), 
    Observable.of(true).do(undefined, undefined,() => console.log('done')), 
]; 

Observable.forkJoin(observables) 
    .subscribe(results => console.log(results)); 

這將打印到控制檯:

done 
done 
done 
[ 42, 'a', true ] 

最後還有就是.finally()運營商。但是,這與使用.do()不一樣。

編輯:

當任何源可觀察量失敗forkJoin()操作者重新發射的誤差(這意味着它也失敗)。
這意味着您需要分別在每個源Observable中捕獲錯誤(例如,使用catch()運算符)。

let observables = [ 
    Observable.throw(new Error()) 
    .catch(() => Observable.of('caught error 1')) 
    .do(undefined, undefined,() => console.log('done 1')), 

    Observable.of('a') 
    .delay(100).catch(() => Observable.of('caught error 2')) 
    .do(undefined, undefined,() => console.log('done 2')), 

    Observable.of(true) 
    .catch(() => Observable.of('caught error 3')) 
    .do(undefined, undefined,() => console.log('done 3')), 
]; 

Observable.forkJoin(observables) 
    .subscribe(results => console.log(results)); 

它打印:

done 1 
done 3 
done 2 
[ 'caught error 1', 'a', true ] 
+0

好的,我明白你的意思了,但如果其中一個觀察對象有例外,那麼forkJoin會在其他人完成之前馬上拿起它。我認爲forkJoin應該等待所有人完成,但是從源頭上看,它會標記爲「完成」而不是下一個。在我的例子中,我希望每個observable能夠優雅地處理錯誤並繼續。有沒有辦法做到這一點? – occasl

+0

@occasl查看我的更新。 – martin

0

我不認爲你需要在地圖上訂閱。

Observable.fromPromise(this.users.getElements()).subscribe(results => { 
    Observable.forkJoin(
    results.map(
     aUser => this.HttpService.submitUser(aUser)) 
    ).subscribe(
     //it never gets to either of these calls after all service calls complete 
     data => { 
     debugger; 
     console.log(data); 
     this.reset(); 
     }, 
     err => { 
     debugger; 
     console.log(err); 
     this.reset(); 
     } 
    )); 
}); 

注意的是,在rxjs例子在這裏:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

他們不同意個別觀測 - ForkJoin讓他們都去,然後等待他們全部返回(在你的訂閱)

編輯:

的forkjoin源是在這裏:

https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/forkjoin.js

並且看起來好像沒有掛鉤來確定每次完成的時間。我認爲最好的方法來處理一個用戶界面欄會讓每個映射的observables單獨訂閱,所有這些都調用一個函數來增加UI計數欄變量,並對「完整性」進行一些測試,以允許您使用這些數據。

+0

呀,後來我將如何增加我(對UI的進度條)爲每一個完成櫃檯?基本上我想在每個人完成時做一些事情,然後在完成時做一些事情。 – occasl

+0

我剛剛查看了forkjoin源代碼,我不認爲它會公開任何允許您插入並查看每個代碼何時進入的內容。我認爲您必須單獨訂閱每個映射並調用某種爲您的UI進度條計數功能。 – chrispy