2016-07-06 67 views
4

你好RxJava主人,RxJava更改線程後CONCAT地圖

在我目前的Android項目,我遇到了一些問題僵局而與RxJava和SQLite播放。我的問題是:

  1. 我在一個線程
  2. 調用Web服務啓動一個事務,並保存一些東西在數據庫
  3. CONCAT映射另一個觀察的功能
  4. 嘗試寫其他的東西在數據庫上--->獲得了僵局

這裏是我的代碼:

//define a scheduler for managing transaction in the same thread 
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor()); 


Observable.just(null) 
      /* Go to known thread to open db transaction */ 
      .observeOn(mScheduler) 
      .doOnNext(o -> myStore.startTransaction()) 
      /* Do some treatments that change thread */ 
      .someWebServiceCallWithRetrofit() 
      /* Return to known thread to save items in db */ 
      .observeOn(mScheduler) 
      .flatMap(items -> saveItems(items)) 
      .subscribe(); 

public Observable<Node> saveItems(List<Item> items) { 
    Observable.from(items) 
      .doOnNext(item -> myStore.saveItem(item)) //write into the database OK 
      .concatMap(tab -> saveSubItems(item)); 
} 

public Observable<Node> saveSubItems(Item item) { 
    return Observable.from(item.getSubItems()) 
      .doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different 
} 

爲什麼突然所有的RxJava正在改變線程?即使我指定我希望他在我自己的調度程序中觀察。我通過在saveSubItem之前添加另一個observeOn來做了一個骯髒的修復,但這可能不是正確的解決方案。

我知道,當你調用與改造web服務,響應被轉發到一個新的線程(這就是爲什麼我創建了自己的調度找回在線程我開始我的SQL事務)。但是,我真的不明白RxJava如何管理線程。

非常感謝您的幫助。

回答

0

我所知doOnNext方法被稱爲在不同的線程,比以前的代碼,因爲它是從asynchroniously序列的其餘部分運行。

示例:您可以執行多個其他呼叫,將其保存到數據庫,並在doOnNext(...)內通知視圖/演示者/控制者一個progres。您可以在保存到數據庫或/和保存到數據庫之前執行此操作。 我會建議你的是「flatMapping」代碼。

所以saveItems方法是這樣的(如果myStore.saveSubItems返回結果):

public Observable<Node> saveSubItems(Item item) { 
return Observable.from(item.getSubItems()) 
     .flatMap(subItem -> myStore.saveSubItems(subItem)) 
} 

使用「flatMapping」該操作在同一線程上與先前順序運行的保證,該順序繼續再flaMap函數結束。

1

副作用運營商(一樣flatMap)執行同步上任意線程調用它。嘗試類似

Observable.just(null)    
      .doOnNext(o -> myStore.startTransaction()) 
      .subscribeOn(mScheduler)  // Go to known thread to open db transaction 
      /* Do some treatments that change thread */ 
      .someWebServiceCallWithRetrofit()      
      .flatMap(items -> saveItems(items)) 
      .subscribeOn(mScheduler) // Return to known thread to save items in db 
      .observeOn(mScheduler) // Irrelevant since we don't observe anything 
      .subscribe();