2017-10-18 82 views
2

我正在實現一個可觀察的來自Resource的線。從正確的線程調用RxJava2可以取消/拋棄

問題是,這個資源真的不喜歡從它創建的不同線程中關閉(它會殺死一隻小狗並在發生這種情況時拋出一個異常)。

當我處理認購,資源Cancellable/Disposablemain線程中調用,而觀察到的是訂閱的Schedulers.io()

這裏是科特林代碼:

fun lines(): Observable<String> = 
     Observable.create { emitter -> 
      val resource = NetworkResource() 
      emitter.setCancellable { 
       resource.close() // <-- main thread :(
      } 
      try { 
       while (!emitter.isDisposed) 
        emitter.onNext(resource.readLine()) // <-- blocked here! 
      } catch (ioe: IOException) { 
       emitter.tryOnError(ioe) // <-- this also triggers the cancellable 
      } 
     } 

val disposable = lines() 
     .subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe { Log.i(TAG, "Line: $it" } 

disposable.dispose() // <-- main thread :) 

問題:是否可以調用從正確的*線程Cancellable,考慮到訂閱線程被阻塞在resource.readLine()

*正確的線程意味着從subscribeOn(Schedures.io())

編輯:恐怕這個問題並沒有一個正確的答案,除非resource.close()是線程安全的或resource.dataReady某種投票的實施,使得線程沒有被阻塞。

+0

我想你應該嘗試'unsubscribeOn'方法來定義'cancellable'的執行位置。 – masp

+0

@masp謝謝你的評論,但它不起作用。請參閱我對以下答案的評論。任何其他想法? – ESala

回答

2

Schedulers.io()管理線程池,所以它可能會或可能不會使用同一個線程來處理您的資源。您將不得不使用自定義調度程序和unsubscribeOn()運算符,以確保您的Observable已訂閱並取消訂閱到同一個線程中。喜歡的東西:

Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 

val disposable = lines() 
     .unsubscribeOn(customScheduler) 
     .subscribeOn(customScheduler) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe { Log.i(TAG, "Line: $it" } 
+0

謝謝你的回答,但它不起作用。來自執行程序的線程在'resource.readLine()'處被阻塞,因此使用'unsubscribeOn(...)'調度可取消的程序永遠不會調用它,因爲線程永遠不會被釋放。想法? – ESala

+0

可以從另一個線程中調用'readLine()'嗎? –

1

如果你不介意的通話延遲到NetworkResource#close一點點,爲什麼不

fun lines(): Observable<String> = 
      Observable.create { emitter -> 
       val resource = NetworkResource() 
       try { 
        while (!emitter.isDisposed) { 
         emitter.onNext(resource.readLine()) 
        } 
        resource.close() 
       } catch (ioe: IOException) { 
        emitter.tryOnError(ioe) 
       } 
      } 

但仍然有一個問題,這一點:在IOException沒有人的情況下,將永遠呼叫NetworkResource#close(在我看來你的例子中也是這樣)。

嘗試解決這個問題:

fun lines(): Observable<String> = 
      Observable.create { emitter -> 
       val resource = NetworkResource() 
       try { 
        while (!emitter.isDisposed) { 
         emitter.onNext(resource.readLine()) 
        } 
       } catch (ioe: IOException) { 
        emitter.tryOnError(ioe) 
       } finally { 
        resource.close() // try-catch here, too? 
       } 
      } 

,或者使用 「科特林 - 嘗試與 - 資源」 功能use

fun lines(): Observable<String> = 
      Observable.create { emitter -> 
       NetworkResource().use { resource -> 
        try { 
         while (!emitter.isDisposed) { 
          emitter.onNext(resource.readLine()) 
         } 
        } catch (ioe: IOException) { 
         emitter.tryOnError(ioe) 
        } 
       } 
      } 

我希望這有助於。希望你過個愉快的週末。

+0

嘿Peti,謝謝你的回答,但是它並沒有解決問題,因爲線程仍然會停留在'resource.readLine()',所以'emitter.isDisposed'標誌將不會被檢查,直到一行收到了,這可能是一段時間。除非從另一個線程調用'resource.close()'是安全的,否則恐怕沒有答案。週末愉快! – ESala

+0

是的,專用的IO線程會調用'close'延遲...這是「如果你不介意延遲呼叫」這個答案的介紹:-)但這真的是一個問題嗎?因爲'NetworkResource'似乎被阻塞,所以專用的IO線程無論如何都會在正在進行的'lines()'訂閱的生命週期中被浪費掉...... – Peti

+0

你說得對,被「浪費」的線程是一個小問題,我更擔心的是無法關閉底層網絡連接。 – ESala

0

如何選擇替代路徑?

一)使NetworkResource線程安全(如果你是通過與 「代理」包裝NetworkResource的源代碼的控制)

B)?對於「代理」,我的意思是一個內部使用專用線程的代理,它與NetworkResource(構建,readLine,關閉,......)

+0

再次感謝Peti。關於a)我無法控制它,關於b)已經嘗試過它。 – ESala

相關問題