2017-04-18 50 views
2

所以,我有二汽這樣實現從另一個流過濾器:如何使用RxJava

--1--2--3--4--5--6-| 
-----A-----B-------| 

我的目標是有這樣

--1----3------5--6-| 

我一直在使用運營商嘗試過流像takeUntil或skipUntil,但我沒有能夠產生的東西,工作。你能給我一些幫助嗎?

由於

回答

2

通常,有2和A之間的一致的問題,則必須定義一個窗口,第二數據流,可以防止第一個的值被髮射。例如,這將等待每件1毫秒(2.X):

import java.util.concurrent.TimeUnit; 

import io.reactivex.*; 
import io.reactivex.schedulers.TestScheduler; 

public class Coincidence { 

    public static void main(String[] args) { 
     TestScheduler sch = new TestScheduler(); 

     Flowable.interval(100, TimeUnit.MILLISECONDS, sch) 
     .onBackpressureBuffer() 
     .compose(coincide(Flowable.interval(300, TimeUnit.MILLISECONDS, sch), sch)) 
     .take(7) 
     .subscribe(v -> 
      System.out.printf("%d - %d%n", sch.now(TimeUnit.MILLISECONDS), v)); 

     sch.advanceTimeBy(1001, TimeUnit.MILLISECONDS); 
    } 

    static <T, U> FlowableTransformer<T, T> coincide(
      Flowable<U> other, Scheduler scheduler) { 
     return f -> { 
      return other.publish(g -> { 
       return f.flatMap(v -> { 
        return Flowable.just(v) 
          .delay(1, TimeUnit.MILLISECONDS, scheduler) 
          .takeUntil(g) 
        ; 
       }, 1) 
       .takeUntil(g.ignoreElements()) 
       ; 
      }); 
     }; 
    }; 
} 
+0

很好的回答。如果我們假設關於術語「巧合」的不同語義(如我在答案中所解釋的),你認爲我的答案可以是一個有效的選擇嗎? – GVillani82

+0

看起來沒問題。我的答案有一個向前的窗口,你有一個向後的窗口和總的事件順序。我們兩個都可能是對的,根據OP的期望,我們都可能是錯的。 – akarnokd

+0

感謝您的反饋。是的,我認爲這取決於期望。 – GVillani82

0

我認爲akarnokd給了唯一可能的解決辦法,如果你需要處理的「巧合」。

順便說一下,在大多數情況下,您可能不需要同時發生的事件,或者在特定的時間窗口中發生,但是您只需要檢查某個事件是否發生發射。例如,你的情況可以這樣調整:

--1-----2----3------4------5--6-| 
-------A----------B-------------| 

這意味着A(發生在2之後)將阻止它通過。這同樣適用於圖4和B.

true如果是這種情況我會解決這個問題如下: 合併兩個Observables,然後用掃描用於確定哪個項目應該被向下遊傳播,並最終濾波結果。

Observable.merge(Observable.just(1, 2, 3, 4, 5, 6).map(integer -> new ValueObject(integer)), 
     Observable.just("A", "B").map(string -> new BarrierObject(string))) 
     .scan((myObject, myObject2) -> { 
      if (myObject instanceof BarrierObject) { 
       myObject2.setFilter(); 
      } 

      return myObject2; 
     }) 
     .filter(myObject -> (myObject instanceof ValueObject) && myObject.isValid()) 
     .map(myObject -> (Integer) myObject.getValue()) 
     .subscribe(integer -> Log.d("test", "value: " + String.valueOf(integer))); 

ValueObjectBarrierObject擴展以下類:

abstract class MyObject { 
    boolean toBeFiltered; 
    Object value; 

    public void setFilter() { 
     toBeFiltered = true; 
    } 

    public boolean isValid() { 
     return !toBeFiltered; 
    } 

    public Object getValue() { 
     return value; 
    } 
} 
+0

@akarnokd你認爲這是一個合理的解決方案,與預先解釋的假設? – GVillani82

+1

爲了得到回覆發表你對他的回答的評論,他會得到通知,我想。 –