2016-03-02 148 views
3

處理文檔我有5個文件(說),我對他們每個人的一些處理。這裏的處理包括打開文檔/文件,讀取數據,執行一些文件操作(編輯文本等)。對於文檔操作,我可能會使用docx4j或apache-poi。但我的用例是這樣的 - 我想以某種方式並行處理這些4-5文件,利用我的CPU上可用的多個內核。每個文檔的處理都是相互獨立的。爪哇 - 並行

什麼是在Java中實現這種並行處理的最佳方式。我之前在java中使用過ExecutorService,在Thread中也使用過類。但我沒有像左右或StreamsRxJava新概念太多的想法。通過在Java 8中引入Java中的並行流,可以實現此任務嗎?什麼是更好地使用執行人/流/線程類等,如果流可以使用,請提供一個鏈接,我可以找到關於如何做到這一點一些教程。謝謝你的幫助!

回答

3

您可以同時使用以下模式使用Java流處理。

List<File> files = ... 
files.parallelStream().forEach(f -> process(f)); 

File[] files = dir.listFiles(); 
Stream.of(files).parallel().forEach(f -> process(f)); 

注:process不能在這個例子中拋出CheckedException。我建議你記錄它或返回一個結果對象。

1

如果您想了解ReactiveX,我會建議使用rxJava Observable.zip http://reactivex.io/documentation/operators/zip.html

在那裏你可以在這裏並行運行多個進程的例子:就像我說的

public class ObservableZip { 

    private Scheduler scheduler; 
    private Scheduler scheduler1; 
    private Scheduler scheduler2; 

    @Test 
    public void testAsyncZip() { 
      scheduler = Schedulers.newThread();//Thread to open and read 1 file 
      scheduler1 = Schedulers.newThread();//Thread to open and read 1 file 
      scheduler2 = Schedulers.newThread();//Thread to open and read 1 file 
      Observable.zip(obAsyncString(file1), obAsyncString1(file2), obAsyncString2(file3), (s, s2, s3) -> s.concat(s2) 
                         .concat(s3)) 
       .subscribe(result -> showResult("All files in one:", result)); 
     } 

     public void showResult(String transactionType, String result) { 
      System.out.println(result + " " + 
           transactionType); 
     } 

     public Observable<String> obAsyncString(File file) { 
      return Observable.just(file) 
        .observeOn(scheduler) 
        .doOnNext(val -> { 
         //Here you read your file 
        }); 
     } 

     public Observable<String> obAsyncString1(File file) { 
      return Observable.just(file) 
        .observeOn(scheduler1) 
        .doOnNext(val -> { 
         //Here you read your file 2 

        }); 
     } 

     public Observable<String> obAsyncString2(File file) { 
      return Observable.just(file) 
        .observeOn(scheduler2) 
        .doOnNext(val -> { 
         //Here you read your file 3 

        }); 
     } 
     } 

,只是在要了解ReactiveX,因爲如果它不是,在你的棧添加此框架來解決這一問題將是一個有點矯枉過正的情況下,我寧願前面的流並行解決方案

+0

不管怎麼說,這是也有幫助! –