2016-01-22 67 views
2

我正在編寫一項服務,用戶可以從Spotify播放列表中粘貼網址,然後將播放列表導出到其他服務中。對於需要粘貼到請求中的每個跟蹤網址,都需要對Spotify API進行操作。限制由rxjs進行的http調用

此代碼:

 Rx.Observable.fromArray<ITrackIdentifier>(this._allTracks) 
      .pluck<string>("id") 
      .distinct() 
      .flatMap(
       (trackId) => this.spotifyService.lookupTrack(trackId). 
        catch((error) => this.handleError(error))) 
      .subscribe(
       (result) => this.handleTrackLookupResult(result), 
       (error) => this.handleError(error), 
       () => this.handleComplete() 
      ); 
  1. 創建從ITrackIdentifiers
  2. 列表可觀察到取軌道標識的ID來創建一個可觀察的字符串(IDS)
  3. 刪除任何重複的ID在列表中
  4. 爲每個http調用創建一個可觀測值以進行發現(並捕獲錯誤)
  5. 合併將所有這些可觀測量的結果合併爲一個具有平面圖的流

除了添加大量曲目之外,這實際上工作正常。我的樣本播放列表中有一個樣本播放列表超過500個,因此立即進行了500個調用,瀏覽器需要處理它們並從緩存中返回項目,以便瀏覽器速度較慢並鎖定,並且由於我超出api調用限制,Spotify會返回大量錯誤。

我想只能說10個電話同時運行。 Merge with maxConcurrent集似乎是在Stackoverflow上討論的完美解決方案。

這應該是這樣的:

 Rx.Observable.fromArray<ITrackIdentifier>(this._allTracks) 
      .pluck<string>("id") 
      .distinct() 
      .map(
       (trackId) => this.spotifyService.lookupTrack(trackId). 
        catch((error) => this.handleError(error))) 
      .merge(10) 
      .subscribe(
       (result) => this.handleTrackLookupResult(result), 
       (error) => this.handleError(error), 
       () => this.handleComplete() 
      ); 

但它是行不通的。在Chrome網絡調試器中,您可以看到同時進行的所有呼叫,並且大多數時間排隊等待直到它們失敗。

爲什麼不能正常工作?我還可以繞過這個問題嗎?

這裏是Github checkin與這個階段的項目:

+0

合併工作,因爲它應該。 它限制了訂閱的數量。 但是在'合併'之前,你有一個'map',它實際上提出所有請求,然後'合併'踢入。 – psx

回答

4

您的代碼使用merge的問題是spotifyService.lookupTrack未返回Observable而是PromiseObservable的某些功能類似flatMaphandle Promises as well,但ObservablePromise之間的差異在於Observable是惰性的,而Promise不是。根據用戶3743222的建議,您可以使用Observable.defer從承諾工廠函數中創建懶惰可觀察值。這個小例子是用JavaScript代替TypeScript的,所以可以在這裏運行。

console.log = x => {var d = document,b=d.body,p=d.createElement('pre'); p.style.margin = "0"; p.appendChild(d.createTextNode(''+x)); b.appendChild(p); window.scrollTo(0, b.scrollHeight); }; 
 

 
function log_delay(timeout, value) { 
 
    return new Promise(resolve => { 
 
    console.log('Start: ' + value); 
 
    setTimeout(() => { 
 
     console.log('End: ' + value); 
 
     resolve(value); 
 
    }, timeout); 
 
    }); 
 
} 
 

 
Rx.Observable.range(0, 6) 
 
.map(x => Rx.Observable.defer(
 
() => log_delay(1000, x) 
 
    .catch(e => console.log('Inner catch')) 
 
)) 
 
.merge(2) 
 
.subscribe(
 
    s => console.log('Result: ' + s), 
 
    s => console.log('Error: ' + s), 
 
    s => console.log('Complete') 
 
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>

+0

非常感謝。現在完美運作。 Git集線器在此修補程序中:https://github.com/Roaders/SpotifyExportTool/tree/be9e91b208e1378c5f4ed677d73e05f06a6c16f9 – Roaders

+0

注意merge()現在是RXJS 5中的mergeAll()。 – kayjtea

0

我設法得到它有點工作,我怎麼想,但我仍然好奇,爲什麼合併沒有工作。 這裏唯一的ID列表構建,然後我們使用concatMap創建可觀察到的每個ID,然後等待延遲時間移動到下一個項目之前:

 Rx.Observable.fromArray<ITrackIdentifier>(this._allTracks) 
      .pluck<string>("id") 
      .distinct() 
      .concatMap((id, index) => Rx.Observable.interval(50).take(1).map(() => { return id })) 
      .flatMap(
       (trackId) => this.spotifyService.lookupTrack(trackId). 
       catch((error) => this.handleError(error))) 
      .subscribe(
       (result) => this.handleTrackLookupResult(result), 
       (error) => this.handleError(error), 
       () => this.handleComplete() 
      ); 

在這個例子中,我等待每個調用之間的50毫秒。這大大減少了錯誤。

以下是Github checkin與該階段的項目。

+0

你可以只用'()=> id'而不用'()=> {return ID; }' –

+0

另外,'Rx.Observable.just(id).delay(50)'也做同樣的工作 –