2017-07-29 46 views
0

我正在使用Java ExecutorService(ThreadPool)執行任務&更新UI,而特定活動處於前景(可見)。Java ExecutorService - 任務/可調用不取消/中斷

問題: 我要的是,當用戶切換到另一個活動,我想停止/取消所有任務(不管是排隊或正在運行)。爲此,我必須在通過調用isDone()來檢查Future對象的狀態之後,對ExecutorService提交方法返回的Future對象使用ExecutorService shutdown/shutdownNow方法或取消(true)。這將設置中斷的相應線程標誌爲TRUE,我必須在我的可調用實現中檢查(Thread.currentThread.isInterrupted()),以確定是否中斷退出任務/線程。問題是我是否調用ExecutorService的shutdown方法或將來的取消這兩種情況下(true)方法很少的1 10倍,這將線程中斷標誌設置爲TRUE這是最終導致內存泄漏等

代碼:

線程池辛格爾頓實現(cancelAll-取消任務& shutdownExecutor到關機的ExecutorService):

private static class ThreadPoolManager { 

    private ExecutorService executorService; 
    private List<Future> queuedFutures; 
    private BlockingQueue<Runnable> blockingQueue; 

    private static ThreadPoolManager instance; 

    private ThreadPoolManager() { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); 
     queuedFutures = new ArrayList<>(); 
     blockingQueue = new LinkedBlockingDeque<>(); 
     executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
    } 

    static { 
     instance = new ThreadPoolManager(); 
    } 

    public static void submitItemTest(Callable<Object> callable) { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); 
     if(instance.executorService.isShutdown()){ 
      instance=new ThreadPoolManager(); 
     } 
     Future future = instance.executorService.submit(callable); 
     instance.queuedFutures.add(future); 
    } 

    public static void submitTestAll(Callable<Object> callable) { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); 
     if(instance.executorService.isShutdown()){ 
      instance=new ThreadPoolManager(); 
     } 
     cancelAll(); 
     Future future = instance.executorService.submit(callable); 
     instance.queuedFutures.add(future); 
    } 

    public static void cancelAll() { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); 
     instance.blockingQueue.clear(); 
     for (Future future : instance.queuedFutures) { 
      if (!future.isDone()) { 
       boolean cancelled = future.cancel(true); 
       MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); 
      } 
     } 
     instance.queuedFutures.clear(); 
    } 

    public static void shutdownExecutor(){ 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); 
     instance.executorService.shutdownNow(); 
    } 
} 

可贖回執行情況(正常迭代& if語句來檢查中斷) :

private Callable<Object> getTestAllCallable() { 
     return new Callable<Object>() { 
      @Override 
      public Object call() { 
       for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
        if (!Thread.currentThread().isInterrupted()) { 
          //someWork 

        } else { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling"); 
         return null; 
        } 
       } 
       return null; 
      } 
     }; 
    } 

活動/片段的onStop實現(調用取消任務&關機):

@Override 
public void onStop() { 
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called"); 
    ThreadPoolManager.cancelAll(); 
    ThreadPoolManager.shutdownExecutor(); 
    super.onStop(); 
} 

更新:由

變化:

  1. 使用遷可運行而不是可調用的。

  2. 現在不使用單例執行ExecutorService。

    private class ThreadPoolManager { 
    
        private ExecutorService executorService; 
        private List<Future> queuedFutures; 
        private BlockingQueue<Runnable> blockingQueue; 
    
        private ThreadPoolManager() { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); 
         queuedFutures = new ArrayList<>(); 
         blockingQueue = new LinkedBlockingDeque<>(); 
         executorService =getNewExecutorService(); 
        } 
    
        private ExecutorService getNewExecutorService(){ 
         return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
        } 
    
        private void submitItemTest(Runnable runnable) { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); 
         if(executorService.isShutdown()){ 
          executorService=getNewExecutorService(); 
         } 
         Future future = executorService.submit(runnable); 
         queuedFutures.add(future); 
        } 
    
        private void submitTestAll(Runnable runnable) { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); 
         if(executorService.isShutdown()){ 
          executorService=getNewExecutorService(); 
         } 
         cancelAll(); 
         Future future = executorService.submit(runnable); 
         queuedFutures.add(future); 
        } 
    
        private void cancelAll() { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); 
         blockingQueue.clear(); 
         for (Future future : queuedFutures) { 
          if (!future.isDone()) { 
           boolean cancelled = future.cancel(true); 
           MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); 
          } 
         } 
         queuedFutures.clear(); 
        } 
    
        private void shutdownExecutor(){ 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); 
         executorService.shutdownNow(); 
         blockingQueue.clear(); 
         queuedFutures.clear(); 
        } 
    } 
    

發現了罪魁禍首,但不是解決辦法呢。以下是正在運行的Runnables 1(isInterrupted返回true或InterupptedException和任務結束)的實現,但不包含其他。

工作Runnable接口(我用它測試):

new Runnable() { 
      @Override 
      public void run() { 
        int i=0; 
        while(!Thread.currentThread().isInterrupted()){ 
         try { 
          System.out.println(i); 
          Thread.currentThread().sleep(2000); 
         } catch (InterruptedException e) { 
          MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted"); 
          return; 
         } 
         i++; 
        } 
       } 
      } 

不工作(實際的代碼我想使用):

new Runnable(){ 
      @Override 
      public void run() { 
       for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
        if (!Thread.currentThread().isInterrupted()) { 

        } else { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)"); 
         break; 
        } 
       } 
      } 
     }; 

和1個可能的解決辦法是使用變量(布爾)作爲runnable中的中斷標誌,我認爲這是最後的手段,但很樂意瞭解這個錯誤。

回答

0

解決方案(出路): 所以最後我又繼續使用自定義的內部標誌(布爾)作爲一個線程中斷標誌將在每次迭代時由MyRunnable進行檢查(自定義標誌的自定義實現,以便每個可運行標誌都有一個標誌)。當需要在ExecutorService(ThreadPool)下取消線程時,我遍歷所有Future對象,並獲取它與MyRunnable相關聯,並將其中斷標誌(自定義標誌)設置爲true,而不是中斷/關閉線程。

ThreadPoolManager:

private class ThreadPoolManager { 

     private ExecutorService executorService; 
     private final Map<Future,MyRunnable> queuedFutures; 
     private final BlockingQueue<Runnable> blockingQueue; 

     private ThreadPoolManager() { 
      MyLogger.log(DEBUG, "Threadpool-created(constructor)"); 
      queuedFutures = new HashMap<>(); 
      blockingQueue = new LinkedBlockingDeque<>(); 
      executorService = getNewExecutorService(); 
     } 

     private ExecutorService getNewExecutorService() { 
      return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
     } 

     private void submitItemTest(MyRunnable runnable) { 
      MyLogger.log(DEBUG, "Threadpool-submitted item test"); 
      if (executorService.isShutdown()) { 
       executorService = getNewExecutorService(); 
      } 
      Future future = executorService.submit(runnable); 
      queuedFutures.put(future,runnable); 
     } 

     private void submitTestAll(MyRunnable runnable) { 
      MyLogger.log(DEBUG, "Threadpool-submitted test all"); 
      if (executorService.isShutdown()) { 
       executorService = getNewExecutorService(); 
      } 
      cancelAll(); 
      Future future = executorService.submit(runnable); 
      queuedFutures.put(future,runnable); 
     } 

     private void cancelAll() { 
      MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks"); 
      blockingQueue.clear(); 
      for (Future future : queuedFutures.keySet()) { 
       if (!future.isDone()) { 
        queuedFutures.get(future).continueRunning=false; 
        MyLogger.log(DEBUG, "Cancelled"); 
       } 
      } 
      queuedFutures.clear(); 
     } 

     private void shutdownExecutor() { 
      cancelAll(); 
      MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool"); 
      executorService.shutdown(); 
     } 
    } 

MyRunnable(實現可運行抽象類):

private abstract class MyRunnable implements Runnable { 
     boolean continueRunning=true; 
    } 

MyRunnable(抽象類MyRunnable的實例):

new MyRunnable() { 
     @Override 
     public void run() { 
      for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
       if (continueRunning) { 
         //someWork 
       } else { 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)"); 
        break; 
       } 
      } 
      System.out.println("ThreadPool: Test complete"); 
     } 
    }; 

現在,調用threadPoolManager.shutdownExecutor()關閉/中斷當前正在運行的所有線程。

0

根據ExecutorService文檔,關閉正在執行的任務是盡最大努力完成的。

因此,當你調用ExecutorService.shutdownNow()實施將嘗試關閉所有當前正在執行的任務。每個任務將保持運行,直到它檢測到它被中斷爲止。

爲了保證您的線程達到這一點在早期階段,它是一個好主意,在你的循環添加一個檢查線程是否被interuppted如下:

Thread.currentThread().isInterrupted(); 

通過使每此調用迭代你的線程會從實際的中斷中間隔很短的時間內檢測到中斷。

所以修改後的Callable代碼如下所示:

private Callable<Object> getTestAllCallable() { 
    return new Callable<Object>() { 
     @Override 
     public Object call() { 
      for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
       if(Thread.currentThread().isInterrupted()) { 
        return null; 
       } 
       if(someCondition) { 
        //someWork 
       } else { 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling"); 
        return null; 
       } 
      } 
      return null; 
     } 
    }; 
} 

順便說一句,有在使用Callable,如果你不打算從call()方法返回的任何值沒有意義。如果你需要在你的任務參數化類型只需要創建一個參數Runnable如下:

public class ParameterizedRunnable<T> implements Runnable { 
    private final T t; 

    public ParameterizedRunnable(T t) { 
     this.t = t; 
    } 

    public void run() { 
     //do some work here 
    } 
} 
+0

對不起,我現在編輯我的代碼,包括你已經陳述了實際上我已經這樣做,如問題描述中所述,但與其他代碼不相關的清除。 –

+0

這並不能解決你的問題? – zuckermanori

+0

我正在使用callable而不是runnable與想法獲取Future對象,同時將可調用對象提交給ExecutorService,現在知道它也可以使用runnable來完成。感謝您告訴我,我將更改代碼以使用runnable。 –