我正在實現一個可觀察的來自Resource
的線。從正確的線程調用RxJava2可以取消/拋棄
問題是,這個資源真的不喜歡從它創建的不同線程中關閉(它會殺死一隻小狗並在發生這種情況時拋出一個異常)。
當我處理認購,資源Cancellable
/Disposable
從main
線程中調用,而觀察到的是訂閱的Schedulers.io()
。
這裏是科特林代碼:
fun lines(): Observable<String> =
Observable.create { emitter ->
val resource = NetworkResource()
emitter.setCancellable {
resource.close() // <-- main thread :(
}
try {
while (!emitter.isDisposed)
emitter.onNext(resource.readLine()) // <-- blocked here!
} catch (ioe: IOException) {
emitter.tryOnError(ioe) // <-- this also triggers the cancellable
}
}
val disposable = lines()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }
disposable.dispose() // <-- main thread :)
問題:是否可以調用從正確的*線程Cancellable
,考慮到訂閱線程被阻塞在resource.readLine()
?
*正確的線程意味着從subscribeOn(Schedures.io())
。
編輯:恐怕這個問題並沒有一個正確的答案,除非resource.close()
是線程安全的或resource.dataReady
某種投票的實施,使得線程沒有被阻塞。
我想你應該嘗試'unsubscribeOn'方法來定義'cancellable'的執行位置。 – masp
@masp謝謝你的評論,但它不起作用。請參閱我對以下答案的評論。任何其他想法? – ESala