0

我正在使用Spring 4.3.8.RELEASE和Java 7.我想創建一個線程工廠來幫助管理我的應用程序中的某些工作人員。我宣佈我的線程工廠是這樣的如何等待我的線程工廠完成其所有任務?

<bean id="myprojectThreadFactory" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory"> 
    <constructor-arg value="prefix-"/> 
</bean> 
<bean id="myprojectTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <property name="threadFactory" ref="myprojectThreadFactory"/> 
    <property name="corePoolSize" value="${myproject.core.thread.pool.size}" /> 
    <property name="maxPoolSize" value="${myproject.max.thread.pool.size}" /> 
</bean> 

但是,我在加入線程時遇到了問題。也就是說,我要等到所有的工作具有一定的任務,所以我有

m_importEventsWorker.work(); 
    m_threadExecutor.shutdown(); 
    System.out.println("done."); 

在我的線程池,像這樣

public void work(final MyWorkUnit pmyprojectOrg) 
{ 
    final List<MyWorkUnit> allOrgs = new ArrayList<MyWorkUnit>(); 
    if (pmyprojectOrg != null) 
    { 
     processData(pmyprojectOrg.getmyprojectOrgId()); 
    } else { 
     allOrgs.addAll(m_myprojectSvc.findAllWithNonEmptyTokens()); 
     // Cue up threads to execute 
     for (final MyWorkUnit myprojectOrg : allOrgs) 
     { 
      m_threadExecutor.execute(new Thread(new Runnable(){ 
       @Override 
       public void run() 
       { 
        System.out.println("started."); 
        processData(myprojectOrg.getmyprojectOrgId()); 
       } 
      })); 
     } // for 

然而,什麼會打印出執行繼續之前完成是

done. 
started. 
started. 

很清楚,我不是在等待。什麼是正確的方式來等待我的線程完成工作?

+0

你想對線程做什麼?他們正在處理相同的數據集還是分開的數據?你能描述一下流程嗎? – jeorfevre

+0

線程在不同的數據集上有用。我想知道什麼時候所有的線程都完成了他們的任務,但顯然「關機」方法不是要走的路。 – Dave

+0

您可以調用'm_threadExecutir.submit(...)',並且該方法將返回一個'FutureTask'實例,您應該調用'FutureTask'的'get()'方法來等待任務完成。 – dabaicai

回答

0

由於我使用Spring的ThreadPoolTask​​Executor類,我發現低於適合我的需求......

protected void waitForThreadPool(final ThreadPoolTaskExecutor threadPoolExecutor) 
{ 
    threadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true); 
    threadPoolExecutor.shutdown();  
    try { 
     threadPoolExecutor.getThreadPoolExecutor().awaitTermination(30, TimeUnit.SECONDS); 
    } catch (IllegalStateException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} // waitForThreadPool 
0

A CountDownLatch用給定的計數初始化。該計數通過調用countDown()方法遞減。等待此計數達到零的線程可以調用await()方法之一。調用await()會阻塞該線程,直到計數達到零。

您可以使用CountDownLatch主線程等待完成所有的task.You可以宣佈CountDownLatch與大小在主線程中調用await()方法任務CountDownLatch latch = new CountDownLatch(3);的數量等,每個任務完成呼叫countDown()

public void work(final MyWorkUnit pmyprojectOrg) 
{ 
    final List<MyWorkUnit> allOrgs = new ArrayList<MyWorkUnit>(); 
    if (pmyprojectOrg != null) 
    { 
     processData(pmyprojectOrg.getmyprojectOrgId()); 
    } else { 
     allOrgs.addAll(m_myprojectSvc.findAllWithNonEmptyTokens()); 

     CountDownLatch latch = new CountDownLatch(allOrgs.size()); 

     // Cue up threads to execute 
     for (final MyWorkUnit myprojectOrg : allOrgs) 
     { 
      m_threadExecutor.execute(new Thread(new Runnable(){ 
       @Override 
       public void run() 
       { 
        System.out.println("started."); 
        processData(myprojectOrg.getmyprojectOrgId()); 
        latch.countDown(); 
       } 
      })); 
     } 
     //After for loop 
     latch.await();  

實施例:

CountDownLatch latch = new CountDownLatch(3); 

Waiter  waiter  = new Waiter(latch); 
Decrementer decrementer = new Decrementer(latch); 

new Thread(waiter)  .start(); 
new Thread(decrementer).start(); 

public class Waiter implements Runnable{ 

    CountDownLatch latch = null; 

    public Waiter(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 
     try { 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Waiter Released"); 
    } 
} 

公共類遞減器實現Runnable {

CountDownLatch latch = null; 

public Decrementer(CountDownLatch latch) { 
    this.latch = latch; 
} 



public void run() { 

     try { 
      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

謝謝,但我不清楚我會如何將它應用於我題。你是說當我創建一個Runnable實例時,我應該創建你的「服務員」類嗎? – Dave

+0

創建一個倒數閂鎖實例並傳遞給每個可運行的實例並倒計時。調用工作調用後等待方法 –

+0

「m_threadExecutor.shutdown();」實際上是從JUnit測試中調用的,在測試中的b/c我想在thrads完成後檢查一些條件。我想避免將代碼添加到我的核心項目中以滿足JUnit測試,但如果沒有其他方法來執行此操作,我會考慮它。 – Dave

0

可以使用的ExecutorService創建一個固定的線程池,並檢查池大小是否爲空沒有:

ExecutorService executor = Executors.newFixedThreadPool(50); 

如果使用此執行運行任務,並通過使用@Scheduled固定利率或FIXEDDELAY定期檢查線程池的大小,你可以看到,如果他們完成與否。

ThreadPoolExecutor poolInfo = (ThreadPoolExecutor) executor; 
Integer activeTaskCount = poolInfo.getActiveCount(); 

if(activeTaskCount = 0) { 
    //If it is 0, it means threads are waiting for tasks, they have no assigned tasks. 
    //Do whatever you want here! 
} 
+0

這將告訴我在一個固定的時間點,如果我被證明,但我想實際上暫停程序執行,直到一切都完成。 – Dave