首先,Observable.merge
流一起:這給你一個所有物品的流。 (在下面的代碼,我使用的自定義類Either
來標記每個流。)
然後,對於流中的每個項目,嘗試與其它類型的先前觀察到的項匹配它,並輸出這對。如果沒有,請將其保存爲稍後匹配。
最後,一旦流完成,其餘不匹配的元素將不會與任何內容匹配,因此它們可以不成對排放。
import io.reactivex.Observable
data class Entity(val id: Int)
data class Dto(val id: Int)
sealed class Either<out A, out B>
data class Left<A>(val value: A) : Either<A, Nothing>()
data class Right<B>(val value: B) : Either<Nothing, B>()
fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> {
val unmatchedA = mutableMapOf<C, A>()
val unmatchedB = mutableMapOf<C, B>()
val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest ->
when (latest) {
is Left -> {
val id = idA(latest.value)
unmatchedB.remove(id)?.let { [email protected] Observable.just(latest.value to it) }
unmatchedA.put(id, latest.value)
}
is Right -> {
val id = idB(latest.value)
unmatchedA.remove(id)?.let { [email protected] Observable.just(it to latest.value) }
unmatchedB.put(id, latest.value)
}
}
Observable.empty<Nothing>()
}
return Observable.concat(merged, Observable.create { emitter ->
unmatchedA.values.forEach { emitter.onNext(it to null) }
unmatchedB.values.forEach { emitter.onNext(null to it) }
emitter.onComplete()
})
}
fun main(args: Array<String>) {
val entities = Observable.just(Entity(2), Entity(1), Entity(4))
val dtos = Observable.just(Dto(3), Dto(2), Dto(1))
joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println)
}
(Entity(id=2), Dto(id=2))
(Entity(id=1), Dto(id=1))
(Entity(id=4), null)
(null, Dto(id=3))
注意,這可能有一些奇怪的行爲,如果ID流中的重複,並根據流的結構有可能,這將最終緩衝很多存儲元件。
您希望Observables等待相應元素多長時間? –
無限期我想。來自DB和Dtos的實體來自Http請求,所以我應該在內存中擁有完整的數據集 –
如果是這樣,爲什麼要進行流式處理,可能會更容易將它們轉換爲內存集合,然後進行轉換。 –