2014-09-03 66 views
5

我有一些大型的文本文件,我想通過對其行進行分組來處理。是否可以在java 8中做一個懶惰的groupby,返回一個流?

我試圖用新的流媒體功能,如

return FileUtils.readLines(...) 
      .parallelStream() 
      .map(...) 
      .collect(groupingBy(pair -> pair[0])); 

的問題是,據我所知,這會產生一個地圖。

有沒有什麼辦法可以像上面那樣生成高級代碼,例如生成一個條目流?

UPDATE:我在找的東西就像是python的itertools.groupby。我的文件已經排序(通過pair [0]),我只是想逐個加載組。

我已經有了一個迭代解決方案。我只是想知道是否有更具說明性的方式來做到這一點。順便說一句,使用番石榴或其他第三方庫不會是一個大問題。

+2

哪有你做得很懶通過...分組?爲了對流中包含的對象的某些屬性進行分組,您必須遍歷流中的所有元素。 – Eran 2014-09-03 20:27:31

+0

你是什麼意思的「分組線」?你的意思是像Stream'groupBy'方法裝倉,還是你的意思是批量讀取多行? – dkatzel 2014-09-03 20:29:27

+0

感謝您的意見,增加了一個更新的問題。 – 2014-09-03 21:13:12

回答

3

你想要實現的任務與分組完全不同。 groupingBy不依賴於Stream元素的順序,而是依據Map的算法應用於分類器Function的結果。

你想要的是將具有共同屬性值的相鄰項目摺疊成一個List項目。甚至沒有必要通過該屬性對Stream進行排序,只要您能保證所有具有相同屬性值的項目都被聚類。

也許有可能把這個任務作爲一個減少,但對我來說,最終的結構看起來太複雜了。

所以,除非此功能直接支持被添加到Stream S,迭代器爲基礎的方法看起來最務實的對我說:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { 
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
      Stream<? extends T> s, Function<? super T, ? extends G> f) { 
     return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); 
    } 
    private final Spliterator<? extends T> source; 
    private final Function<? super T, ? extends G> pf; 
    private final Consumer<T> c=this::addItem; 
    private List<T> pending, result; 
    private G pendingGroup, resultGroup; 

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { 
     source=s; 
     pf=f; 
    } 
    private void addItem(T item) { 
     G group=pf.apply(item); 
     if(pending==null) pending=new ArrayList<>(); 
     else if(!pending.isEmpty()) { 
      if(!Objects.equals(group, pendingGroup)) { 
       if(pending.size()==1) 
        result=Collections.singletonList(pending.remove(0)); 
       else { 
        result=pending; 
        pending=new ArrayList<>(); 
       } 
       resultGroup=pendingGroup; 
      } 
     } 
     pendingGroup=group; 
     pending.add(item); 
    } 
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { 
     while(source.tryAdvance(c)) { 
      if(result!=null) { 
       action.accept(entry(resultGroup, result)); 
       result=null; 
       return true; 
      } 
     } 
     if(pending!=null) { 
      action.accept(entry(pendingGroup, pending)); 
      pending=null; 
      return true; 
     } 
     return false; 
    } 
    private Map.Entry<G,List<T>> entry(G g, List<T> l) { 
     return new AbstractMap.SimpleImmutableEntry<>(g, l); 
    } 
    public int characteristics() { return 0; } 
    public long estimateSize() { return Long.MAX_VALUE; } 
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } 
} 

所得到的摺疊Stream可以最好的應用表現出的懶性質它無限流:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) 
     .filter(e -> e.getKey()>5) 
     .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+1

稍作修改:如果下游收集器合作(除了具有UNORDERED特性的除外),'groupingBy'的確保持原始流的順序。任何給定桶中元素的子集將按照它們出現在輸入中的順序呈現給下游收集器。 – 2014-09-04 12:41:11

+1

@Brian Goetz:是的,它*維護訂單,但我在答覆中說的是,它不依賴於組織訂單。順便說一句。這是我爲我的解決方案所做的測試案例之一:將我的解決方案返回的流收集到「Map」中必須使用相同的分類器生成與「groupingBy」完全相同的「Map」。 – Holger 2014-09-04 12:43:34

1

cyclops-react,我庫中的貢獻,同時提供sharding和分組funcitonality可能做你想做的。

ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...)) 
      .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

groupedStatefullyWhile運算符允許根據批處理的當前狀態對元素進行分組。 ReactiveSeq是一個單線程順序流。

Map<Key, Stream<Value> sharded = 
        new LazyReact() 
       .fromCollection(FileUtils.readLines(...)) 
       .map(..) 
       .shard(shards, pair -> pair[0]); 

這將創建一個LazyFutureStream(實現java.util.stream.Stream),將處理文件中的數據和異步並行。這是懶惰的,不會開始處理,直到數據通過。

唯一需要注意的是,您需要預先定義碎片。即上面的'shards'參數是async.Queue的Map由分片的關鍵字(可能是任何pair [0]是?)映射的。

例如

Map<Integer,Queue<String>> shards; 

There is a sharding example with video heretest code here

0

它可以通過collapseStreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; 

StreamEx.of(aa) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .forEach(System.out::println); 

做我們可以添加peeklimit,以驗證它是否懶惰計算:

StreamEx.of(aa) 
     .peek(System.out::println) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .limit(1) 
     .forEach(System.out::println); 
相關問題