13

我有以下代碼片段,它基本上掃描需要執行的任務列表,然後將每個任務交給執行程序執行。ThreadPoolExecutor的處理異常

JobExecutor反過來創建另一個執行器(用於執行數據庫的東西......讀寫數據到隊列)並完成任務。

JobExecutor爲提交的任務返回Future<Boolean>。當其中一個任務失敗時,我想優雅地中斷所有線程並通過捕獲所有異常來關閉執行程序。我需要做些什麼改變?

public class DataMovingClass { 
    private static final AtomicInteger uniqueId = new AtomicInteger(0); 

    private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator(); 

    ThreadPoolExecutor threadPoolExecutor = null ; 

    private List<Source> sources = new ArrayList<Source>(); 

    private static class IDGenerator extends ThreadLocal<Integer> { 
     @Override 
     public Integer get() { 
      return uniqueId.incrementAndGet(); 
     } 
    } 

    public void init(){ 

    // load sources list 

    } 

    public boolean execute() { 

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10, 
       10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
       new ThreadFactory() { 
        public Thread newThread(Runnable r) { 
         Thread t = new Thread(r); 
         t.setName("DataMigration-" + uniqueNumber.get()); 
         return t; 
        }// End method 
       }, new ThreadPoolExecutor.CallerRunsPolicy()); 

    List<Future<Boolean>> result = new ArrayList<Future<Boolean>>(); 

    for (Source source : sources) { 
        result.add(threadPoolExecutor.submit(new JobExecutor(source))); 
    } 

    for (Future<Boolean> jobDone : result) { 
       try { 
        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { 
         // in case of successful DbWriterClass, we don't need to change 
         // it. 
         success = false; 
        } 
       } catch (Exception ex) { 
        // handle exceptions 
       } 
      } 

    } 

    public class JobExecutor implements Callable<Boolean> { 

     private ThreadPoolExecutor threadPoolExecutor ; 
     Source jobSource ; 
     public SourceJobExecutor(Source source) { 
      this.jobSource = source; 
      threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
        new ThreadFactory() { 
         public Thread newThread(Runnable r) { 
          Thread t = new Thread(r); 
          t.setName("Job Executor-" + uniqueNumber.get()); 
          return t; 
         }// End method 
        }, new ThreadPoolExecutor.CallerRunsPolicy()); 
     } 

     public Boolean call() throws Exception { 
      boolean status = true ; 
      System.out.println("Starting Job = " + jobSource.getName()); 
      try { 

         // do the specified task ; 


      }catch (InterruptedException intrEx) { 
       logger.warn("InterruptedException", intrEx); 
       status = false ; 
      } catch(Exception e) { 
       logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); 
       status = false ; 
      } 
      System.out.println("Ending Job = " + jobSource.getName()); 
      return status ; 
     } 
    } 
} 

回答

14

當您向執行器提交任務時,它將返回一個FutureTask實例。

FutureTask.get()將重新拋出任務拋出的任何異常,如ExecutorException

所以,當你遍歷List<Future>並調用get each,catch ExecutorException並調用有序關閉。

+0

好的。您是否看到任何其他缺陷或需要處理異常的位置? – jagamot 2010-03-31 18:09:50

1

子類ThreadPoolExecutor並覆蓋其protected afterExecute (Runnable r, Throwable t)方法。

如果您通過java.util.concurrent.Executors便利課程創建線程池(您不瞭解),請查看源代碼以瞭解它是如何調用ThreadPoolExecutor的。

0

由於您正在向ThreadPoolExecutor提交任務,因此異常會被FutureTask吞併。

看一看這個code

**Inside FutureTask$Sync** 

void innerRun() { 
    if (!compareAndSetState(READY, RUNNING)) 
     return; 

    runner = Thread.currentThread(); 
    if (getState() == RUNNING) { // recheck after setting thread 
     V result; 
     try { 
      result = callable.call(); 
     } catch (Throwable ex) { 
      setException(ex); 
      return; 
     } 
     set(result); 
    } else { 
     releaseShared(0); // cancel 
    } 

}

protected void setException(Throwable t) { 
    sync.innerSetException(t); 
} 

從上面的代碼,很顯然,setException方法捕捉Throwable。由於這個原因,FutureTask是,如果你使用的ThreadPoolExecutor

按Java documentationsubmit()」方法吞嚥所有的異常,可以在ThreadPoolExecutor

protected void afterExecute(Runnable r, 
          Throwable t) 

示例代碼擴展afterExecute()方法按文件:

class ExtendedExecutor extends ThreadPoolExecutor { 
    // ... 
    protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    if (t == null && r instanceof Future<?>) { 
     try { 
     Object result = ((Future<?>) r).get(); 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
    } 
    if (t != null) 
     System.out.println(t); 
    } 
} 

您可以捕捉三種Exceptions方式

  1. Future.get()在接受的答案提示
  2. 包裹整個run()call()方法在try{}catch{}Exceptoion{}
  3. ThreadPoolExecutor方法的覆蓋afterExecute如上所示

要正常中斷其他線程,看看在低於SE問題:

How to stop next thread from running in a ScheduledThreadPoolExecutor

How to forcefully shutdown java ExecutorService