2016-09-15 204 views
2

我有重複元素的列表,說:Rx等同於GROUP BY的COUNT?

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 

我想通過他們與他們出現了多少次沿數值組他們,所以輸出會是對的:

{"A", 3}, {"B", 1}, {"C", 2} 

基本上是一個SQL語句像SELECT x, COUNT(1) GROUP BY x;

相當於我只拿到了這麼遠,呼籲GROUPBY他們:

source.groupBy(x -> x, x -> 1) 

但是,這將流轉換爲GroupedObservables,我無法找到一個很好的示例如何與他們繼續前進。我嘗試了reduce(),但這裏不好,因爲在groupBy()之後,它希望減少分組的觀察對象,而不是每個組內的元素。

這是可能與GroupedObservables?有沒有其他方法可以達到預期效果?

+1

你知道嗎,如果你有一個熱點觀測值或沒有一個'OnComplete'信號,你不能做這個計算? – Enigmativity

+0

不,我沒有想到,但似乎可以理解,謝謝。實際上,我列出了整個列表,而不是我的例子中的元素(我簡化了問題的代碼),所以我猜這在這裏不是問題,對吧? –

+0

是的,但如果它是一個冷的可觀測值,它會發出所有的值,您可能不需要Rx。你想要做什麼? – Enigmativity

回答

8

以下代碼:

source.groupBy(val -> val) 
    .flatMap(
     gr -> gr.count() 
       .map(count -> new Pair<>(gr.getKey(), count) 
    ) 
).subscribe(System.out::println); 

將打印出:

A=3 
B=1 
C=2 
+0

真高雅,謝謝! –

1
Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 
    Observable<KeyValue<String, Integer>> countStream = source 
      .groupBy(val -> val) 
      .flatMap(obs -> obs.count().flatMap(cnt -> Observable.just(new KeyValue<>(obs.getKey(), cnt)))); 

    private static class KeyValue<K, V> { 

    private final K key; 
    private final V val; 

    public KeyValue(K key, V val) { 
     this.key = key; 
     this.val = val; 
    } 

    public K getKey() { 
     return key; 
    } 

    public V getVal() { 
     return val; 
    } 
} 
1

的另一種方法是使用collectcollectInto方法,如下所示。

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 

source.collectInto(new HashMap<String, MutableInt>(), (map, elem) -> { 
    if (map.containsKey(elem)) { 
    map.get(elem).increment(); 
    } else { 
    map.put(elem, new MutableInt(1)); 
    } 

}).subscribe(System.out::println); 

順便說一句,如果我們使用Reactorcollect是這樣做的提示方式,因爲在大量組的情況下,GROUPBY後flatMap將掛起。

javadoc of Reactor

注意GROUPBY效果最好組的低基數,所以相應地選擇你的keyMapper功能。
...
值得注意的是,當條件產生大量的組時,如果組不適合下游使用(例如由於設置了過低的maxConcurrency參數的flatMap),則可能導致掛起。

還有一個與此相關的github issue