107

在帶有lambda b93的JDK 8中,有一個類java.util.stream.Streams.zip in b93可用於壓縮流(這在教程Exploring Java8 Lambdas. Part 1 by Dhananjay Nene中進行了說明)。此功能:使用帶有lambda的JDK8壓縮流(java.util.stream.Streams.zip)

創建一個惰性且連續的組合流,其中的元素是組合兩個流的元素的 結果。

但是在b98中這個消失了。事實上,Streams類甚至不能在java.util.stream in b98中訪問。

是否已將此功能移動,如果是這樣,我如何使用b98精簡地壓縮數據流?

我心目中的應用是in this java implementation of Shen,在這裏我取代了

    拉鍊功能
  • static <T> boolean every(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)
  • static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred)

有相當冗長的代碼功能(不使用來自b98的功能)。

+0

啊剛發現它似乎已經被徹底刪除:http://mail.openjdk.java.net/pipermail/lambda-libs-spec-observers/2013-June/002029.html – artella

+0

可能[Java 8 java.util.stream.Streams]的副本(http://stackoverflow.com/questions/16780647/java-8-java-util-stream-streams) – assylias

+0

「探索Java8 Lambdas。第1部分」 - 新鏈接對於這篇文章是http://blog.dhananjaynene.com/2013/02/exploring-java8-lambdas-part-1/ –

回答

58

我需要這也讓我只是把從B93的源代碼,並把它放在一個「UTIL」級。我不得不稍微修改它以適應當前的API。

僅供參考這裏的工作代碼(把它在你自己的風險...):

public static<A, B, C> Stream<C> zip(Stream<? extends A> a, 
            Stream<? extends B> b, 
            BiFunction<? super A, ? super B, ? extends C> zipper) { 
    Objects.requireNonNull(zipper); 
    Spliterator<? extends A> aSpliterator = Objects.requireNonNull(a).spliterator(); 
    Spliterator<? extends B> bSpliterator = Objects.requireNonNull(b).spliterator(); 

    // Zipping looses DISTINCT and SORTED characteristics 
    int characteristics = aSpliterator.characteristics() & bSpliterator.characteristics() & 
      ~(Spliterator.DISTINCT | Spliterator.SORTED); 

    long zipSize = ((characteristics & Spliterator.SIZED) != 0) 
      ? Math.min(aSpliterator.getExactSizeIfKnown(), bSpliterator.getExactSizeIfKnown()) 
      : -1; 

    Iterator<A> aIterator = Spliterators.iterator(aSpliterator); 
    Iterator<B> bIterator = Spliterators.iterator(bSpliterator); 
    Iterator<C> cIterator = new Iterator<C>() { 
     @Override 
     public boolean hasNext() { 
      return aIterator.hasNext() && bIterator.hasNext(); 
     } 

     @Override 
     public C next() { 
      return zipper.apply(aIterator.next(), bIterator.next()); 
     } 
    }; 

    Spliterator<C> split = Spliterators.spliterator(cIterator, zipSize, characteristics); 
    return (a.isParallel() || b.isParallel()) 
      ? StreamSupport.stream(split, true) 
      : StreamSupport.stream(split, false); 
} 
+1

如果_either_stream是'SIZED',不是兩個都不是'SIZED'嗎? –

+4

我不這麼認爲。這兩個流都必須是「SIZED」才能運行。 它實際上取決於你如何定義壓縮。例如,你應該能夠壓縮兩個不同大小的流嗎?那麼結果流將是什麼樣子呢?我相信這就是爲什麼這個函數實際上被API省略了。有很多方法可以做到這一點,用戶可以決定哪些行爲應該是「正確」的行爲。你會丟棄更長的流中的元素還是填充較短的列表?如果是這樣,有什麼價值? – siki

+0

除非我遺漏了一些東西,否則不需要任何演員表(例如「Spliterator ')。 – Jubobs

6

Lazy-Seq庫提供zip功能。

https://github.com/nurkiewicz/LazySeq

這個庫深受啓發scala.collection.immutable.Stream旨在提供不可變,線程安全和易於使用的懶惰順序執行,可能是無限的。

9

您提到的類的方法已被移至Stream接口本身有利於默認方法。但似乎zip方法已被刪除。也許是因爲不清楚不同大小的流的默認行爲應該是什麼。但是實現期望的行爲是直接的:

static <T> boolean every(
    Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) { 
    Iterator<T> it=c2.iterator(); 
    return c1.stream().allMatch(x->!it.hasNext()||pred.test(x, it.next())); 
} 
static <T> T find(Collection<T> c1, Collection<T> c2, BiPredicate<T, T> pred) { 
    Iterator<T> it=c2.iterator(); 
    return c1.stream().filter(x->it.hasNext()&&pred.test(x, it.next())) 
     .findFirst().orElse(null); 
} 
+0

是不是'謂詞'你傳遞給過濾器*有狀態*?這違反了方法合約,特別是在並行處理流時不起作用。 – Andreas

+1

@Andreas:這裏的解決方案都不支持並行處理。由於我的方法不返回一個流,他們確保這些流不會並行運行。同樣,接受的答案的代碼返回一個可以平行轉換的流,但實際上並不會並行執行任何操作。也就是說,有狀態謂詞不鼓勵但不違反契約。如果確保狀態更新是線程安全的,它們甚至可以在並行上下文中使用。在某些情況下,它們是不可避免的,例如把一個流變成獨特的是一個有狀態的謂詞*本身*。 – Holger

+2

@Andreas:你可能會猜到爲什麼這些操作已經從Java API中刪除了...... – Holger

37

拉鍊是由protonpack library提供的功能之一。

Stream<String> streamA = Stream.of("A", "B", "C"); 
Stream<String> streamB = Stream.of("Apple", "Banana", "Carrot", "Doughnut"); 

List<String> zipped = StreamUtils.zip(streamA, 
             streamB, 
             (a, b) -> a + " is for " + b) 
           .collect(Collectors.toList()); 

assertThat(zipped, 
      contains("A is for Apple", "B is for Banana", "C is for Carrot")); 
+0

:http://amaembo.github.io/streamex/javadoc/one/util/streamex/StreamEx.html#zip-java.util.List-java.util.List-java.util.function。 BiFunction- – tokland

1
public class Tuple<S,T> { 
    private final S object1; 
    private final T object2; 

    public Tuple(S object1, T object2) { 
     this.object1 = object1; 
     this.object2 = object2; 
    } 

    public S getObject1() { 
     return object1; 
    } 

    public T getObject2() { 
     return object2; 
    } 
} 


public class StreamUtils { 

    private StreamUtils() { 
    } 

    public static <T> Stream<Tuple<Integer,T>> zipWithIndex(Stream<T> stream) { 
     Stream<Integer> integerStream = IntStream.range(0, Integer.MAX_VALUE).boxed(); 
     Iterator<Integer> integerIterator = integerStream.iterator(); 
     return stream.map(x -> new Tuple<>(integerIterator.next(), x)); 
    } 
} 
19

正在壓縮使用JDK8與拉姆達(gist)兩個流。

public static <A, B, C> Stream<C> zip(Stream<A> streamA, Stream<B> streamB, BiFunction<A, B, C> zipper) { 
    final Iterator<A> iteratorA = streamA.iterator(); 
    final Iterator<B> iteratorB = streamB.iterator(); 
    final Iterator<C> iteratorC = new Iterator<C>() { 
     @Override 
     public boolean hasNext() { 
      return iteratorA.hasNext() && iteratorB.hasNext(); 
     } 

     @Override 
     public C next() { 
      return zipper.apply(iteratorA.next(), iteratorB.next()); 
     } 
    }; 
    final boolean parallel = streamA.isParallel() || streamB.isParallel(); 
    return iteratorToFiniteStream(iteratorC, parallel); 
} 

public static <T> Stream<T> iteratorToFiniteStream(Iterator<T> iterator, boolean parallel) { 
    final Iterable<T> iterable =() -> iterator; 
    return StreamSupport.stream(iterable.spliterator(), parallel); 
} 
+1

不錯的解決方案和(相對)緊湊!要求你在文件的頂部放置'import java.util.function。*;'和'import java.util.stream。*;'。 – sffc

+0

請注意,這是流上的終端操作。這意味着對於無限流,這種方法會崩潰 – smac89

1

AOL的cyclops-react,而我出力,還同時提供了通過extended Stream implementation,也實現了無功流接口ReactiveSeq,並通過它通過靜態方法標準提供了許多相同的功能StreamUtils荏苒功能, Java Streams。

List<Tuple2<Integer,Integer>> list = ReactiveSeq.of(1,2,3,4,5,6) 
                .zip(Stream.of(100,200,300,400)); 


    List<Tuple2<Integer,Integer>> list = StreamUtils.zip(Stream.of(1,2,3,4,5,6), 
                Stream.of(100,200,300,400)); 

它還提供了更廣泛的基於應用的壓縮。例如。

ReactiveSeq.of("a","b","c") 
       .ap3(this::concat) 
       .ap(of("1","2","3")) 
       .ap(of(".","?","!")) 
       .toList(); 

    //List("a1.","b2?","c3!"); 

    private String concat(String a, String b, String c){ 
    return a+b+c; 
    } 

即使配對每一個項目在一個流與每一個項目在其他

ReactiveSeq.of("a","b","c") 
       .forEach2(str->Stream.of(str+"!","2"), a->b->a+"_"+b); 

    //ReactiveSeq("a_a!","a_2","b_b!","b_2","c_c!","c2") 
0

這是偉大的能力。我不得不兩個流壓縮成一個地圖與一個流是所述鍵和另一個是值

Stream<String> streamA = Stream.of("A", "B", "C"); 
Stream<String> streamB = Stream.of("Apple", "Banana", "Carrot", "Doughnut");  
final Stream<Map.Entry<String, String>> s = StreamUtils.zip(streamA, 
        streamB, 
        (a, b) -> { 
         final Map.Entry<String, String> entry = new AbstractMap.SimpleEntry<String, String>(a, b); 
         return entry; 
        }); 

System.out.println(s.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))); 

輸出: {A =蘋果,B =香蕉,C =胡蘿蔔}

7

由於我能「T想象任何使用荏苒上比索引的那些(列表)等收藏的,我喜歡簡單的大風扇,這將是我的解決方案:

<A,B,C> Stream<C> zipped(List<A> lista, List<B> listb, BiFunction<A,B,C> zipper){ 
    int shortestLength = Math.min(lista.size(),listb.size()); 
    return IntStream.range(0,shortestLength).mapToObject(i -> { 
      return zipper.apply(lista.get(i), listb.get(i)); 
    });   
} 
12

如果你在你的項目有番石榴,您可以使用Streams.zip方法(加入番石榴21):

返回一個流,其中每個元素都是將streamA和streamB中的每個元素傳遞給函數的結果。結果流將只與兩個輸入流中較短的一樣長;如果一個流更長,其額外的元素將被忽略。結果流不能有效地分割。這可能會損害並行性能。

public class Streams { 
    ... 

    public static <A, B, R> Stream<R> zip(Stream<A> streamA, 
      Stream<B> streamB, BiFunction<? super A, ? super B, R> function) { 
     ... 
    } 
} 
0

我謙恭地提出這個實現。結果流被截斷爲兩個輸入流中較短的一個。

public static <L, R, T> Stream<T> zip(Stream<L> leftStream, Stream<R> rightStream, BiFunction<L, R, T> combiner) { 
    Spliterator<L> lefts = leftStream.spliterator(); 
    Spliterator<R> rights = rightStream.spliterator(); 
    return StreamSupport.stream(new AbstractSpliterator<T>(Long.min(lefts.estimateSize(), rights.estimateSize()), lefts.characteristics() & rights.characteristics()) { 
     @Override 
     public boolean tryAdvance(Consumer<? super T> action) { 
      return lefts.tryAdvance(left->rights.tryAdvance(right->action.accept(combiner.apply(left, right)))); 
     } 
    }, leftStream.isParallel() || rightStream.isParallel()); 
}