2014-10-29 52 views
1

我有以下代碼根據@ a.bertucci在這裏提供的示例Emit objects for drawing in the UI in a regular interval using RxJava on Android,其中我使用Timer對一個Observable進行壓縮。當我通過調用processDelayedItems()來觸發訂閱時,壓縮的Observable中的代碼[A]只執行一次,一個項目發送到[B]。我希望代碼[A]在觸發後連續運行,並且每1500毫秒保持發射物體,但顯然它只在這裏運行一次。爲什麼這個observable只發出一個值

private static void processDelayedItems() { 

    Observable.zip(
      Observable.create(new Observable.OnSubscribe<Object>() { 

       @Override public void call(Subscriber<? super Object> subscriber) { 
        // [A] this code is only called once 
        subscriber.OnNext(o) 
       } 

      }), 
      Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() { 
       @Override public Object call(Object entity, Long aLong) { 
        return entity; 
       } 
      } 
    ) 
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Action1<Object>() { 

       @Override public void call(Object entity) { 
        // ... and accordingly one item is emitted [B] 
       } 

      }, new Action1<Throwable>() { 

       @Override public void call(Throwable throwable) { 
        throwable.printStackTrace(); 
       } 

      }, new Action0() { 

       @Override public void call() { 

       } 

      }); 

} 
  1. 任何人都可以看到我這裏有問題嗎?是否需要從函數外部引用Observable以使其保持更長時間?它是由GC(Android)收集的嗎?這個函數是靜態的嗎?

  2. Observable的生命期規則是什麼?有沒有什麼最佳實踐應該引用更長時間運行的Observable,以及它們是否可以是靜態的?在我的測試中,我注意到它並不重要,但也許它在這裏,當涉及一個計時器。

-

更正代碼[還沒有成型]:

  • 添加重複()

    Observable.zip(
         Observable.create(new Observable.OnSubscribe<Object>() { 
    
          @Override public void call(Subscriber<? super Object> subscriber) { 
           // [A] this code is only called once 
           subscriber.OnNext(o); 
           subscriber.OnCompleted(); 
          } 
    
         }).repeat(Schedulers.newThread()), 
         Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() { 
          @Override public Object call(Object entity, Long aLong) { 
           return entity; 
          } 
         } 
    ) 
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) 
         .subscribe(new Action1<Object>() { 
    
          @Override public void call(Object entity) { 
           // ... and accordingly one item is emitted [B] 
          } 
    
         }, new Action1<Throwable>() { 
    
          @Override public void call(Throwable throwable) { 
           throwable.printStackTrace(); 
          } 
    
         }, new Action0() { 
    
          @Override public void call() { 
    
          } 
    
         }); 
    

回答

1

您需要repeat產生無限的可觀測。例如,

Observable.create(new Observable.OnSubscribe<Object>() { 

     @Override public void call(Subscriber<? super Object> subscriber) { 
      // [A] this code is only called once 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onNext(o); 
      } 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onCompleted(); 
      } 
     } 

    }).repeat(Schedulers.newThread()); 

難道我需要從引用可觀測的功能外,以保持它活着有更多的時間?它是由GC(Android)收集的嗎?這個函數是靜態的嗎?

由於您使用Schedulers.newThread()timer,會有其中有你而觀察到的一些參考其他線程。你不需要更多的工作。

Observable的生命期規則是什麼?有沒有什麼最佳實踐應該引用更長時間運行的Observable,以及它們是否可以是靜態的?在我的測試中,我注意到它並不重要,但也許它在這裏,當涉及一個計時器。

你說得對。沒關係。

+0

感謝您的詳細回覆。這是有道理的,因爲Y combinator在兩側都會查找可觀察對象,如果一側沒有東西需要拉鍊,則不會發射任何東西。 – 2014-10-29 14:12:40

+0

我剛剛測試過 - 不知道爲什麼 - Observable仍然只發射一次。我更正的代碼在我原來的帖子下方。即使我不這麼認爲,請檢查我是否已將repeat()添加到正確的Observable中。 – 2014-10-29 14:34:10

+0

另一個問題:有沒有辦法實現相同的不同?像使用zip而不是使用zip,有一個定時器/時間間隔Observable發出固定的時間間隔,然後觸發另一個Observable在定時器/間隔Oberservable後面串行切換? – 2014-10-29 14:36:20

1

關於你的評論,爲簡單起見,你可以這樣做,

Observable.timer(1500, 1500, TimeUnit.MILLISECONDS) 
     .flatMap(new Func1<Long, Observable<Object>>() { 
      @Override 
      public Observable<Object> call(Long aLong) { 
       String o = "0"; 
       return Observable.from(o); 
      } 
     }) 
     .subscribe(new Action1<Object>() { 
      @Override 
      public void call(Object aLong) { 
       System.out.println(aLong); 
      } 
     }); 

在這裏,你仍然可以得到定時器的好處,而不在上面添加的ZIP /重複。它仍然有點冗長,但有點簡單。

+0

這看起來好多了。當我使用Observable.from(o); o是Object類型,它告訴我「from」是折舊的。所以我將它改爲Observable.from(new Object [] {entity}); - 以防有人讀這個。 – 2014-10-29 18:10:15

+0

使用'just'而不是棄用的'from'。 – zsxwing 2014-10-30 01:43:18

+0

@米格爾·拉維尼,爲什麼不是'地圖'? – zsxwing 2014-10-30 01:44:15