2016-06-10 48 views
2

這可觀察正在執行以下Rxswift地圖+ CONCAT並行

  • 鑑於觀察到的源
  • 我們用地圖來執行一些異步工作
  • 我們使用CONCAT返回異步工作的結果爲了

以下是返回所需的結果,但我想開始 並行的異步工作。

用Rx做正確的方法是什麼?

import RxSwift 

func delay(time: Int, closure:() -> Void) { 
    dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))), 
    dispatch_get_main_queue(), closure) 
} 

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> { 
    return Observable.create() { (observer) -> Disposable in 
    print(desc) 
    delay(time) { 
     observer.onNext(value) 
     observer.onCompleted() 
    } 
    return NopDisposable.instance 
    } 
} 

let seq = Observable 
    .of(1, 2, 3, 4, 5) 
    .map { (n) -> Observable<Int> in 
    return doAsyncWork(n, 
     desc: "start \(n) - wait \(5 - n)", 
     time: 6 - n 
    ) 
    } 
    .concat() 

let sharedSeq = seq.shareReplay(0) 
sharedSeq.subscribeNext { print("=> \($0)") } 
sharedSeq.subscribeCompleted { print("=> completed") } 

這產生

//start 1 - wait 4 
// => 1 
//start 2 - wait 3 
// => 2 
//start 3 - wait 2 
// => 3 
//start 4 - wait 1 
// => 4 
//start 5 - wait 0 
// => 5 

所需的輸出將

//start 1 - wait 4 
//start 2 - wait 3 
//start 3 - wait 2 
//start 4 - wait 1 
//start 5 - wait 0 
// => 1 
// => 2 
// => 3 
// => 4 
// => 5 
+0

你說你想要並行啓動'Observable's,但是你想要的輸出似乎不同意。如果它們都沒有延遲地啓動,那麼不應該是'5,4,3,2,1',因爲'5'等待0秒,'4'等待1秒等等。 – solidcell

+0

如果你更新你的文章以包含一個'swift'標籤,你的問題和答案將得到語法高亮顯示。 – solidcell

+0

對不起@solidcell這個問題非常模糊,我現在希望更清楚地編輯它。我的目標是按順序返回異步工作的結果,但要同時開始每個異步工作。如果保持順序無關緊要,那麼使用你所建議的flatmap可以完美地工作 – Pierre

回答

2

這似乎是工作不知道這是雖然

import RxSwift 

func delay(time: Int, closure:() -> Void) { 
    dispatch_after(
    dispatch_time(DISPATCH_TIME_NOW, Int64(time * Int(NSEC_PER_SEC))), 
    dispatch_get_main_queue(), closure) 
} 

func doAsyncWork(value: Int, desc: String, time: Int) -> Observable<Int> { 
    return Observable.create() { (observer) -> Disposable in 
    print(desc) 
    delay(time) { 
     observer.onNext(value) 
     observer.onCompleted() 
    } 
    return NopDisposable.instance 
    } 
} 

let seq = Observable 
    .of(1, 2, 3, 4, 5) 
    .map { (n) -> Observable<Int> in 
    let o = doAsyncWork(n, 
     desc: "start \(n) - wait \(5 - n)", 
     time: 6 - n 
    ).shareReplay(1) 
    o.subscribe() 
    return o.asObservable() 
    } 
    .concat() 

let sharedSeq = seq.shareReplay(0) 
sharedSeq.subscribeNext { print("=> \($0)") } 
sharedSeq.subscribeCompleted { print("=> completed") } 
+0

是的,那就是我建議你做的。正如你發現的那樣,你問題中的代碼不是創建一個「熱」可觀察的,只是一個「冷」可觀察的。因此,通過立即訂閱,可觀察者將開始工作。你的解決方案的另一個變體是使用一個'Subject',它已經是一個熱門的可觀察對象。您將不需要'o.subscribe'。 – solidcell

1

「所需的最佳答案輸出「似乎不同意你的想法e使Observable「並行」啓動,但是延遲它們的元件,使得「5」沒有延遲,「4」具有1秒延遲,「3」具有2秒延遲等等。

I會認爲你在尋找這樣的輸出:

start 1 - wait 4 
start 2 - wait 3 
start 3 - wait 2 
start 4 - wait 1 
start 5 - wait 0 
5 
4 
3 
2 
1 

這是你可以用它來做到這一點:如果你的意思是別的東西

Observable.range(start: 1, count: 5) 
    .flatMap { n -> Observable<Int> in 
     let waitInterval = 5 - n 
     print("start \(n) - wait \(waitInterval)") 
     return Observable.just(n) 
      .delaySubscription(RxTimeInterval(waitInterval), scheduler: MainScheduler.instance) 
    } 
    .subscribeNext { i in 
     print(i) 
    } 
    .addDisposableTo(disposeBag) 

,你也許可以輕鬆地調整這個片段來完成你的目標。

0

這對你現在沒有幫助,但也許會在未來幫助別人。

您正在查找的操作員稱爲concatMap。但是,目前,它不存在於RxSwift

目前存在關閉的PR here

0

爲什麼您預期的那樣,這並不工作的原因是concat訂閱該源可觀測量一次一個,等待第一個完成它簽約的第二等之前。

在RxJava中有concatEager,它可以滿足您的需求 - 在開始時訂閱所有來源,同時仍然保持秩序。但似乎沒有在Swift中。

你可以做的是壓縮每個項目的索引,flatMap,按索引排序並解壓縮。