2017-04-02 91 views
4

現在我持有一個Observable<Observable<Integer>,我怎樣才能將它轉換成包含n元笛卡兒乘積的Observable<int[]>n-ary笛卡爾乘積inRxJava

例如:

Observable<Observable<Integer> ob = Observable.just(
    Observable.just(0,1), 
    Observable.just(2,3), 
    Observable.just(4,5) 
); 
ob...... -> (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5) 

回答

1

首先,你需要輸入Observable s的固定數。其次,不需要阻塞,但可能需要緩存,因爲需要多次使用第二,第三等其他緩存。

import java.util.*; 

import io.reactivex.Observable; 

public class Cartesian { 

    static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) { 
     return sources.toList().flatMapObservable(list -> cartesian(list)); 
    } 

    static Observable<int[]> cartesian(List<Observable<Integer>> sources) { 
     if (sources.size() == 0) { 
      return Observable.<int[]>empty(); 
     } 
     Observable<int[]> main = sources.get(0).map(v -> new int[] { v }); 

     for (int i = 1; i < sources.size(); i++) { 
      int j = i; 
      Observable<Integer> o = sources.get(i).cache(); 
      main = main.flatMap(v -> { 
       return o.map(w -> { 
        int[] arr = Arrays.copyOf(v, j + 1); 
        arr[j] = w; 
        return arr; 
       }); 
      }); 
     } 

     return main; 
    } 

    public static void main(String[] args) { 
     cartesian(Observable.just(
      Observable.just(0, 1), 
      Observable.just(2, 3), 
      Observable.just(4, 5) 
     )) 
     .subscribe(v -> System.out.println(Arrays.toString(v))); 
    } 
} 
0

以異步方式創建笛卡兒積是硬的或在某種意義上是不可能的。如果阻塞是OK,你可以做這樣的事情

public class Main 
{ 

    static class ProductIterator<T> implements Iterator<T[]> 
    { 
     private final List<List<T>> componentsList; 
     private final Class<T> componentClass; 
     private final int[] indices; 
     private boolean hasNext; 

     public ProductIterator(List<List<T>> componentsList, Class<T> componentClass) 
     { 
      this.componentsList = componentsList; 
      this.componentClass = componentClass; 
      this.indices = new int[componentsList.size()]; 
      this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size(); 
     } 

     @Override 
     public boolean hasNext() 
     { 
      return hasNext; 
     } 

     @Override 
     public T[] next() 
     { 
      T[] res = (T[]) Array.newInstance(componentClass, componentsList.size()); 
      for (int i = 0; i < componentsList.size(); i++) 
      { 
       res[i] = componentsList.get(i).get(indices[i]); 
      } 

      // move next 
      indices[0]++; 
      for (int i = 0; i < componentsList.size() - 1; i++) 
      { 
       if (indices[i] == componentsList.get(i).size()) 
       { 
        indices[i] = 0; 
        indices[i + 1]++; 
       } 
      } 
      hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size(); 

      return res; 
     } 
    } 

    public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass) 
    { 
     return Observable.fromIterable(new Iterable<T[]>() 
     { 
      @Override 
      public Iterator<T[]> iterator() 
      { 
       // postpone blocking up until iterator is requested 
       // and by this point we can't postpone anymore 
       Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList(); 
       return new ProductIterator<T>(componentsList.blockingGet(), componentClass); 
      } 
     }); 
    } 

    public static void main(String[] args) throws Exception 
    { 
     Observable<Observable<Integer>> ob = Observable.just(
       Observable.just(0, 1), 
       Observable.just(2, 3), 
       Observable.just(4, 5) 
     ); 

     Observable<Integer[]> product = product(ob, Integer.class); 
     product.forEach(a -> System.out.println(Arrays.toString(a))); 
    } 
} 

它可以提高此代碼,以避免阻塞,但你仍然必須從所有Observable的高速緩存所有的結果和代碼將更加複雜。無論如何,最有可能的阻塞是不可接受的,試圖獲得笛卡爾產品是個壞主意。

0

嗯,我可以自己解決它。但是有沒有更優雅的方式?
(該toArray方法轉換的Observable<T>T[]

Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) { 
     List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last(); 
     return Observable.create(new SyncOnSubscribe<int[], int[]>() { 
      @Override 
      protected int[] generateState() { 
       int[] array = new int[list.size()]; 
       Arrays.fill(array, 0); 
       return array; 
      } 

      @Override 
      protected int[] next(int[] state, Observer<? super int[]> observer) { 
       int[] next = new int[list.size()]; 
       for (int i = 0; i < next.length; i++) { 
        next[i] = list.get(i)[state[i]]; 
       } 
       observer.onNext(next); 
       state[state.length - 1]++; 
       for (int i = state.length - 1; i >= 0; i--) { 
        int delta = list.get(i).length - state[i]; 
        if (delta > 0) { 
         break; 
        } else if (delta == 0) { 
         state[i] = 0; 
         if (i == 0) { 
          observer.onCompleted(); 
          break; 
         } 
         state[i - 1]++; 
        } 
       } 
       return state; 
      } 
     }); 
    }