2011-11-03 75 views
1

讓我們在應用程序中使用一個經典的Executor。許多應用程序使用此執行程序進行一些計算,每次計算都可以取消,因此我可以在執行程序上調用shutdown()shutdownNow()Java執行程序部分關閉

但我只想關閉執行程序中的部分任務。可悲的是,我不能訪問Future對象,他們計算的實現

我想是這樣的Executor包裝,這是我能傳遞給計算和應支持(實際上是計算由演員框架jetlang支持)的私處由真正的執行者。類似這樣的:

// main application executor 
Executor applicationExecutor = Executors.newCachedThreadPool(); 

// starting computation 
Executor computationExecutor = new ExecutorWrapper(applicationExecutor); 
Computation computation = new Computation(computationExecutor); 
computation.start(); 

// cancelling computation 
computation.cancel(); 
// shutting down only computation tasks 
computationExecutor.shutdown(); 

// applicationExecutor remains running and happy 

還是其他的想法?

回答

2

對於那些,誰想要良好的兩端:有最終的解決方案,部分基於伊萬Sopov的回答。幸運的是,jetlang僅用於運行其任務Executor接口(而不是ExecutorService),所以我創建了包裝類,它支持僅由此包裝器創建的停止任務。

static class StoppableExecutor implements Executor { 
    final ExecutorService executor; 
    final List<Future<?>> futures = Lists.newArrayList(); 
    boolean stopped; 

    public StoppableExecutor(ExecutorService executor) { 
     this.executor = executor; 
    } 

    void stop() { 
     this.stopped = true; 
     synchronized (futures) { 
      for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) { 
       Future<?> future = iterator.next(); 
       if (!future.isDone() && !future.isCancelled()) { 
        System.out.println(future.cancel(true)); 
       } 
      } 
      futures.clear(); 
     } 
    } 

    @Override 
    public void execute(Runnable command) { 
     if (!stopped) { 
      synchronized (futures) { 
       Future<?> newFuture = executor.submit(command); 
       for (Iterator<Future<?>> iterator = futures.iterator(); iterator.hasNext();) { 
        Future<?> future = iterator.next(); 
        if (future.isDone() || future.isCancelled()) 
         iterator.remove(); 
       } 
       futures.add(newFuture); 
      } 
     } 
    } 
} 

使用,這是非常簡單的:

ExecutorService service = Executors.newFixedThreadPool(5); 
StoppableExecutor executor = new StoppableExecutor(service); 

// doing some actor stuff with executor instance 
PoolFiberFactory factory = new PoolFiberFactory(executor); 

// stopping tasks only created on executor instance 
// executor service is happily running other tasks 
executor.stop(); 

這就是全部。工作很好。

0

如果將Computation設置爲Runnable(並使用提供的Executor運行),直到設置布爾標誌爲止?沿線的東西:

public class Computation 
{ 
    boolean volatile stopped; 

    public void run(){ 
    while(!stopped){ 
    //do magic 
    } 

    public void cancel)(){stopped=true;} 
} 

你在做什麼基本上是停止線程。但是,它不會被垃圾回收,而是被重用,因爲它由Executor管理。查找「停止線程的正確方法是什麼?」。

編輯:請注意上面的代碼是相當原始的,因爲它假定while循環的主體需要很短的時間。如果沒有,檢查將很少執行,您會注意到取消任務和實際停止之間的延遲。

+0

謝謝。一個計算由許多任務組成。這些任務是框架的私有部分,我只能編寫從這些任務調用的回調函數。回調是相對較長的時間過程。我沒有Runnable對象,它被提交給Executor。是的,我想停止屬於當前計算的線程。 – mschayna

+0

正確。要立即停止它們(幾乎),你需要中斷。正確地做到這一點有點棘手,尤其是當任務在一部分池中運行時的線程。我無法比Brian Goetz在實踐中的Java併發性更好地解釋它,第7章。 – mbatchkarov

+0

我知道你的解決方案是停止線程的標準。我想要的就像「銀彈」:-),當我無法注入計算代碼時。 – mschayna

0

這樣的事情? 你可能會做部分關閉:

for (Future<?> future : %ExecutorServiceWrapperInstance%.getFutures()) { 
    if (%CONDITION%) { 
     future.cancel(true); 
    } 
} 

下面是代碼:

package com.sopovs.moradanen; 

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Future; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

public class ExecutorServiceWrapper implements ExecutorService { 

private final ExecutorService realService; 
private List<Future<?>> futures = new ArrayList<Future<?>>(); 

public ExecutorServiceWrapper(ExecutorService realService) { 
    this.realService = realService; 
} 

@Override 
public void execute(Runnable command) { 
    realService.execute(command); 
} 

@Override 
public void shutdown() { 
    realService.shutdown(); 

} 

@Override 
public List<Runnable> shutdownNow() { 
    return realService.shutdownNow(); 
} 

@Override 
public boolean isShutdown() { 
    return realService.isShutdown(); 
} 

@Override 
public boolean isTerminated() { 
    return realService.isTerminated(); 
} 

@Override 
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 
    return realService.awaitTermination(timeout, unit); 
} 

@Override 
public <T> Future<T> submit(Callable<T> task) { 
    Future<T> future = realService.submit(task); 
    synchronized (this) { 
     futures.add(future); 
    } 
    return future; 
} 

public synchronized List<Future<?>> getFutures() { 
    return Collections.unmodifiableList(futures); 
} 

@Override 
public <T> Future<T> submit(Runnable task, T result) { 
    Future<T> future = realService.submit(task, result); 
    synchronized (this) { 
     futures.add(future); 
    } 
    return future; 
} 

@Override 
public Future<?> submit(Runnable task) { 
    Future<?> future = realService.submit(task); 
    synchronized (this) { 
     futures.add(future); 
    } 
    return future; 
} 

@Override 
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { 
    List<Future<T>> future = realService.invokeAll(tasks); 
    synchronized (this) { 
     futures.addAll(future); 
    } 
    return future; 
} 

@Override 
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
     throws InterruptedException { 
    List<Future<T>> future = realService.invokeAll(tasks, timeout, unit); 
    synchronized (this) { 
     futures.addAll(future); 
    } 
    return future; 
} 

@Override 
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { 
    //don't know what to do here. Maybe this method is not needed by the framework 
    //than just throw new NotImplementedException(); 
    return realService.invokeAny(tasks); 
} 

@Override 
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
     throws InterruptedException, ExecutionException, TimeoutException { 
    //don't know what to do here. Maybe this method is not needed by the framework 
    //than just throw new NotImplementedException(); 
    return realService.invokeAny(tasks, timeout, unit); 
} 
} 
+1

謝謝,最後我有我自己的解決方案,這很容易從你的答案中得到啓發。 – mschayna