2013-07-31 165 views
21

我一直在使用Java 8中的CompletionStage/CompletableFuture進行異步處理,效果很好。然而,有時我想要一個階段來執行迭代器/項目流的異步處理,而且似乎沒有辦法做到這一點。是否可以使用Java 8 Streams API進行異步處理?

具體來說,Stream.forEach()具有在調用所有項目之後的語義。我想同樣的事情,但有CompletionStage代替,例如:

CompletionStage<Void> done = stream.forEach(...); 
done.thenRun(...); 

如果流是通過異步流結果的支持,這將是比等待它是在上面的代碼本身完整的好得多。

是否有可能以某種方式用當前的Java 8 API來構造它?解決方法?

+0

嗨!我已經更新了我的答案。如果它會滿足你,請投票,我試圖贏得更多的聲譽;) –

+2

@Rickard可能已經太晚了:)但我找到了你想要的東西,如果我正確理解你的話。 http://www.reactive-streams.org/。 https://github.com/reactor/reactor –

回答

20

據我所知,streams API不支持異步事件處理。聽起來你想要的是類似於Reactive Extensions for .NET的東西,並且有一個名爲RxJava的Java端口,由Netflix創建。

RxJava支持許多與Java 8流(如映射和過濾器)相同的高級操作,並且是異步的。

更新:現在有一個reactive streams主動權的作品,它看起來像JDK 9將包括它至少部分雖然Flow級的支持。

+19

的確,RxJava就是你想要的。 Streams的設計中心主要是關於無延遲訪問(無論是來自數據結構還是生成函數)的數據; Rx的設計中心關於可能異步到達的無限事件流。 –

7

正如@KarolKrol所暗示的那樣,您可以使用CompletableFuture流來完成。

有一個基於JDK8流構建的庫,以便於使用名爲cyclops-reactCompletableFuture流。

要組成你的流,你可以使用獨眼巨人反應流暢的承諾ike API或者你可以使用簡單反應的Stages

+0

我忘了現在投票:) –

+0

@KarolKról而不是遊說upvotes,只是寫出最好的答案來解決更多的問題。 –

2

cyclops-react(我是該庫的作者),提供了一個用於處理Streams的類StreamUtils。 它提供的功能之一是futureOperations,它提供了對標準Stream終端操作(然後是一些)的訪問,並帶有扭曲 - Stream異步執行,結果在CompletableFuture內返回。 .eg

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) 
             .map(i->i+2); 
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, 
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

還有一個包裝流,並提供相同的功能,具有很好的流利API一便於學習類ReactiveSeq

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) 
             .map(i->i+2) 
             .futureOperations(
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

正如亞當指出cyclops-react FutureStreams被設計爲異步處理數據(通過將Futures和Streams混合在一起) - 它特別適用於涉及阻塞I/O(如讀取文件,進行db調用,進行其他調用等)的多線程操作。

1

可以生成一個流,將每個元素映射到CompletionStage,然後使用CompletionStage.thenCombine()收集結果,但是如果使用這樣的簡單代碼,結果代碼將不會更具可讀性。

CompletionStage<Collection<Result>> collectionStage = 
     CompletableFuture.completedFuture(
      new LinkedList<>() 
     ); 

    for (Request request : requests) { 
     CompletionStage<Result> resultStage = performRequest(request); 
     collectionStage = collectionStage.thenCombine(
      resultStage, 
      (collection, result) -> { 
       collection.add(result); 
       return collection; 
      } 
     ); 
    } 

    return collectionStage; 

這個例子可以很容易地轉換爲功能forEach不失去可讀性。但使用流的reducecollect需要額外的不太好的代碼。

更新:CompletableFuture.allOfCompletableFuture.join提供了另一種更可讀的方法,將未來結果的收集轉換爲未來的結果收集。

相關問題