2016-07-31 346 views
5

我在爲需要編排不同任務的程序實現優雅的功能樣式解決方案時遇到了麻煩。這是我想要實現的。任務編排實現

我有三個類欲編排其方法(簡化爲簡潔起見):

class TaskA { 
    public ResultA call() { 
     return new ResultA(); 
    } 
} 

class TaskB { 
    public ResultB call(ResultA a) { 
     return new ResultB(); 
    } 
} 

class TaskC { 
    public ResultC call(List<ResultB> resultBs) { 
     return new ResultC(); 
    } 
} 

我需要在平行和用於TaskA每個執行以執行TaskA n次,我需要執行TaskB 'n'次使用相應的TaskA的結果。最後,我需要執行TaskC一次,使用TaskB的所有調用的結果。

實現這一目標將是創建一個Callable封裝調用TaskATaskB終於在我的主線程,收集ResultBFutureList的s到執行的一種方式TaskC

class TaskATaskBCallable implements Callable<ResultB> { 
    private TaskA taskA ...; 
    private TaskB taskB ...; 

    public ResultB call() { 
     return taskB.call(taskA.call()); 
    } 
} 

而且在我的主線:

private ResultC orchestrate() { 
    ExecutorService service = ...; 
    List<Callable<ResultB>> callables = ...; 

    taskC.call(callables.map(callable -> 
     service.submit(callable)).map(Future::get).collect(Collectors.toList()); 
} 

我不喜歡這個解決方案的一件事是TaskATaskBCallable。這可能是一個不必要的類耦合TaskATaskB。此外,如果我必須將另一個任務鏈接到TaskATaskB,我將不得不修改TaskATaskBCallable也可能修改其名稱。我覺得我可以通過更智能地使用諸如CompletableFuturePhaser之類的Java併發庫類來擺脫它。

任何指針?

回答

0

我找到一個方法來做到這一點使用CompletableFuture

private ResultC orchestrate() { 
    ExecutorService service = ...; 
    int taskCount = ...; 

    List<CompletableFuture<ResultB>> resultBFutures = IntStream.rangeClosed(1, taskCount) 
        .mapToObj((i) -> CompletableFuture.supplyAsync(() -> new TaskA().call(), service)) 
        .map(resultAFuture -> resultAFuture.thenApplyAsync(resultA -> new TaskB().call(resultA), 
            service)) 
        .collect(Collectors.toList()); 

    return new TaskC().call(CompletableFuture.allOf(resultBFutures.toArray(new CompletableFuture[resultBFutures.size()])) 
        .thenApply(v -> resultBFutures.stream().map(CompletableFuture::join) 
            .collect(Collectors.toList())) 
        .join()); 
} 
-1

我覺得確實CompletableFuture將被證明最優雅:

int taskCount = 100; 

    List<ResultB> resultBs = IntStream.range(0, taskCount) 
      .mapToObj(i -> new TaskA()) 
      .map(taskA -> CompletableFuture.supplyAsync(taskA::call)) 
      .map(completableFutureA -> completableFutureA.thenApplyAsync(new TaskB()::call)) 
      .collect(Collectors.toList()) // collect, in order to kick off the async tasks 
      .stream() 
      .map(CompletableFuture::join) 
      .collect(Collectors.toList()); 
    return new TaskC().call(resultBs); 
+0

不完全是,你的'CompletableFuture :: join'使得整個事情連載。你的解決方案可以串行執行以下圖形:'TaskA-> TaskB-> TaskA-> TaskB ....-> TaskC' –

+0

@SwarangaSarma你說得對,我需要在加入之前收集,或者懶惰的評估流螺絲我:) – bowmore