2017-10-16 80 views
2

我試圖在中間應用flatMap之後保持原始順序。闡述了我的意思了圖Rx swift在使用flatMap後保持原始流的原始順序地圖

這裏是:

---- 2-4-1 ------------------(原始流)

----------- 1--2 --------- 4--(網絡活動 - 由flatMap延遲表示)

------ --------- --------- 2 4-1(那受通緝的結果)

下面是詳細情況代碼:

persistMessageEventBus.flatMap({ num -> Observable<Int> in 

     print("aaab Doing \(num)") 

     let t2g = Observable.just(num).delay(Double(num), scheduler: MainScheduler.instance).do(onNext:{ num in print("aaab Done async \(num)")}) 

     return t2g 

    }).concatMap({ num -> Observable<Int> in 

     print("aaab Done map \(num)") 

     return Observable.just(num) 

    }).subscribe(onNext: { num in 

     print("aaab done \(num)") 

    }).addDisposableTo(disposeBag) 

    persistMessageEventBus.onNext(2) 
    persistMessageEventBus.onNext(4) 
    persistMessageEventBus.onNext(1) 

輸出是:

aaab Doing 2 
aaab Doing 4 
aaab Doing 1 
aaab Done async 1 
aaab Done map 1 
aaab done 1 
aaab Done async 2 
aaab Done map 2 
aaab done 2 
aaab Done async 4 
aaab Done map 4 
aaab done 4 

的通緝的輸出是:

aaab Doing 2 
aaab Doing 4 
aaab Doing 1 
aaab Done async 1 
aaab Done async 2 
aaab Done map 2 
aaab done 2 
aaab Done async 4 
aaab Done map 4 
aaab done 4 
aaab Done map 1 
aaab done 1 

是否有類似的東西在RxSwift?

回答

1

改爲使用.concatMap(),它保證了原始順序。

更新#1

那麼顯然它需要索引和一些緩衝。

 typealias Indexed = (num: Int, index: Int) 

     class Buffer { 
      let ordered = PublishSubject<Int>() 
      private var current = 0 
      private var buffer: [Int: Int] = [:] 
      func onNext(_ indexed: Indexed) { 
       self.buffer[indexed.index] = indexed.num 
       for index in self.buffer.keys.sorted() { 
        if index == current { 
         ordered.onNext(self.buffer[index]!) 
         self.buffer.remove(at: self.buffer.index(forKey: index)!) 
         current += 1 
        } 
       } 
      } 
     } 

     let buffer = Buffer() 

     buffer 
      .ordered 
      .subscribe(onNext: { num in 

       print("aaab done \(num)") 

      }) 
      .disposed(by: disposeBag) 

     persistMessageEventBus 
      .mapWithIndex { (num, index) -> Indexed in 
       return (num: num, index: index) 
      } 
      .flatMap({ indexed -> Observable<Indexed> in 

       print("aaab Doing \(indexed.num)") 

       let t2g = Observable.just(indexed).delay(Double(indexed.num), scheduler: MainScheduler.instance).do(onNext: { indexed in print("aaab Done async \(indexed.num)") }) 

       return t2g 

      }) 
      .subscribe(onNext: { indexed in 
       buffer.onNext(indexed) 
      }) 
      .disposed(by: disposeBag) 

     persistMessageEventBus.onNext(2) 
     persistMessageEventBus.onNext(4) 
     persistMessageEventBus.onNext(1) 
 
aaab Done async 1 
aaab done 2 
aaab Done async 2 
aaab done 4 
aaab Done async 4 
aaab done 1 
+0

是的,但隨後的網絡活動不會在平行所以不是尋找這樣的:--------------- 2 ----- ---- 4-1流將看起來像這樣--------------- 2 --------------------- ---- 4-1 – Rotem

+0

@Rotem見** UPDATE#1 ** –

+0

謝謝@ maxim-volgin,這就是我爲解決這個問題所做的一切,儘管我提出這個問題的原因是因爲我想知道在Rx中是否有本地運營商。無論如何,我會接受這個答案,謝謝,gg! – Rotem

相關問題