2016-11-04 531 views
1

我們有搜索字段的窗口。每次用戶輸入搜索內容時都會執行。RxJava一次只能執行一個Observable

  1. 將搜索事件轉換爲數據流。

  2. 在每次新的搜索中,我們需要啓動異步網絡操作並關閉上一個。如何存檔這個效果?

編輯1:這是我試過的。它執行所有可觀察的事物!不是唯一的最後一個,哪裏出錯?

PublishSubject<Integer> subject = PublishSubject.create(); 

subject.switchMap(integer -> Observable.fromCallable(() -> { 
    try { 
     Thread.sleep(500); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    return "-" + integer + "-"; 
})).subscribe(s -> System.out.print(s)); 
for (int i = 0; i < 30; i++) subject.onNext(i); 
+0

我自己還是個初學者,但是難道你不是在觀察用戶搜索輸入,而是反彈,然後用去抖用戶輸入執行搜索? – Andreas

+0

@Andreas是的,這是我需要的一半。此外,一次只能執行一次執行等等。每個新值都取消以前的 – dooplaye

+0

您可能希望同時執行'debounce'和'switchMap',以避免每次按鍵後搜索以及取消舊搜索。 – akarnokd

回答

3

使用PublishSubjectswitchMap

PublishSubject<String> subject = PublishSubject.create(); 
subject 
    .switchMap(
    x -> networkOperationObservable(x) 
      .subscribeOn(Schedulers.io)) 
    .subscribe(subscriber); 

爲了充分利用此networkOperationObservable將明智地迴應unsubscribe呼叫(如關閉Socket或其他)。一般而言,Observable.using是首選工具。

+0

請檢查我的編輯。 – dooplaye

0

PublishSubject正是你所需要的。這是一個Observable您可以隨時添加事件。

創建它:

private Subject<String, String> filteringSubject = PublishSubject.create(); 

然後使用Observable.throttleWithTimeout()操作:

Subscriber<String> filteringSubscriber = new DefaultSubscriber<>(); 
filteringSubject 
    .throttleWithTimeout(250, TimeUnit.MILLISECONDS) 
    .doOnNext(new Action1<String>() { 
     @Override 
     public void call(String text) { 
      //make a network call 
     } 
    }) 
    .subscribe(filteringSubscriber); 

用法:

filteringSubject.onNext(text); 
+0

它不是。網絡調用必須返回一些值。 – dooplaye

+0

好吧,然後'FlatMap''String'網絡調用和訂閱整個'Obsevable'真正的'Subscriber',而不是假的。 –