2011-09-02 74 views
1

所以,我對多線程相當陌生,並且最近在我的所有程序中都使用了這個想法。在我開始使用它之前,我確實想確保它是使用Executor,CompletionService和BlockingQueue以及Observer實現多線程的正確有效方法。我將在下面提供示例代碼,但讓我先快速解釋我認爲它的工作原理,也許這將有所幫助。正確使用ExecutorService,CompletionService,BlockingQueue和Observer的Java?

我擁有的第一件事是一個BlockingQueue所有任務都通過添加(Task任務)方法添加到此隊列中。在創建類之後,使用while(真)調用來調用run方法,從而阻塞隊列,直到將某些內容添加到任務隊列。

在run()queue.take()返回隊列中的項目之後,將某些內容添加到隊列中。然後,我將該項目傳遞給WorkerThread類,它可以處理它。該workerThread被添加到處理等待線程完成的CompletionService池中。

好吧,現在來的部分,我不知道是正確的。我也有一個實現了runnable的內部類,並在類初始化時啓動。它的工作是永遠循環調用pool.take()。所以,這基本上等待WorkerThreads中的一個完成。我讓完成服務處理這個。一旦take()獲得值,內部類將它傳遞給通知觀察者方法。

這是行不通的。它有點讓我擔心有一些主要的類會在任務隊列上運行一段時間(true),並且內部類也循環等待池以接收來自WorkerThread的結果?

這是一個示例實現。你怎麼看?

 public class HttpSchedulerThreaded extends Observable implements Runnable { 

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT 
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName()); 
private CompletionService<VulnInfo> pool; 
private ExecutorService executor ; 
private Thread responseWorkerThread; 
private HttpSchedulerWorker schedulerWorker; 
private boolean shouldRun = true; 
private CountDownLatch doneSignal; 
private String[] vulnClassesIgnoreRedirect; 
private boolean followRedirects; 
private boolean runJavascriptInResponse; 
private boolean isSSL; 
private int numThreadsInPool; 
private BlockingQueue<VulnInfo> queue; 
private boolean isRunning ; 
public HttpSchedulerThreaded(int numThreads) 
{ 
    numThreadsInPool = numThreads; 
    executor = Executors.newFixedThreadPool(numThreads); 
    doneSignal = new CountDownLatch(numThreads); 
    pool = new ExecutorCompletionService<VulnInfo>(executor); 
    schedulerWorker = new HttpSchedulerWorker(); 
    responseWorkerThread = new Thread(schedulerWorker); 
    queue = new LinkedBlockingQueue<VulnInfo>(); 
} 

public HttpSchedulerThreaded() 
{ 
    numThreadsInPool = 1; 
    executor = Executors.newFixedThreadPool(1); 
    doneSignal = new CountDownLatch(1); 
    pool = new ExecutorCompletionService<VulnInfo>(executor); 
    schedulerWorker = new HttpSchedulerWorker(); 
    responseWorkerThread = new Thread(schedulerWorker); 
    queue = new LinkedBlockingQueue<VulnInfo>(); 
} 

public void setThreadCount(int numThreads) 
{ 
    if(!isRunning){ 
    executor = Executors.newFixedThreadPool(numThreads); 
    doneSignal = new CountDownLatch(numThreads); 
    pool = new ExecutorCompletionService<VulnInfo>(executor); 
    numThreadsInPool = numThreads; 
    } 
} 


public void start() 
{ 
    if(!isRunning){ 
     responseWorkerThread.start(); 
     new Thread(this).start(); 
     isRunning = true; 
    } 

} 


public void add(VulnInfo info) { 
    queue.add(info); 
} 

@Override 
public void run() { 
    // TODO Auto-generated method stub 
    while(shouldRun) 
    { 
     try { 
      VulnInfo info = queue.take(); 
      Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal); 
      //System.out.println("submitting to pooler: " + info.getID()); 
      pool.submit(worker); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

/** 
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they 
* are complete it will send them to server for completion. 
* @author Steve 
* 
*/ 
class HttpSchedulerWorker implements Runnable{ 

    public void run() { 
     // TODO Auto-generated method stub 
     while(true) 
     { 
      VulnInfo vulnInfo = null; 
      try { 
       //System.out.println("taking finished request"); 
       Future<VulnInfo> tmp = pool.take(); 
      // Future<VulnInfo> tmp = pool.poll(); 
       if(tmp != null) 
        vulnInfo = tmp.get(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 

      if(vulnInfo != null) 
      { 
       //System.out.println("updating all observers: " + vulnInfo.getID()); 
       updateObservers(vulnInfo); 
      } 



     } 
    } 

} 

回答

2

從我的經驗看,你的解決方案似乎沒問題。我有三個意見/建議:

  1. 一旦你創建執行responseWorkerThread = new Thread(schedulerWorker)responseWorkerThread.start()的一個新的線程,你已經基本上碎裂開來的兩個循環。這部分看起來沒問題。您確實似乎正確使用了Executor的API,但它確實看起來像您可能需要更多的代碼來停止HttpScheduledWorker線程並關閉作爲HttpSchedulerThreaded類的一部分的ExecutionCompletionService
  2. 我不確定您使用queue是否真的有必要。 ExecutionCompletionService已經使用BlockingQueue來管理提交給它的任務。
  3. 您的「問題」可能更適合於beta Code Review網站。