所以,我對多線程相當陌生,並且最近在我的所有程序中都使用了這個想法。在我開始使用它之前,我確實想確保它是使用Executor,CompletionService和BlockingQueue以及Observer實現多線程的正確有效方法。我將在下面提供示例代碼,但讓我先快速解釋我認爲它的工作原理,也許這將有所幫助。正確使用ExecutorService,CompletionService,BlockingQueue和Observer的Java?
我擁有的第一件事是一個BlockingQueue所有任務都通過添加(Task任務)方法添加到此隊列中。在創建類之後,使用while(真)調用來調用run方法,從而阻塞隊列,直到將某些內容添加到任務隊列。
在run()queue.take()返回隊列中的項目之後,將某些內容添加到隊列中。然後,我將該項目傳遞給WorkerThread類,它可以處理它。該workerThread被添加到處理等待線程完成的CompletionService池中。
好吧,現在來的部分,我不知道是正確的。我也有一個實現了runnable的內部類,並在類初始化時啓動。它的工作是永遠循環調用pool.take()。所以,這基本上等待WorkerThreads中的一個完成。我讓完成服務處理這個。一旦take()獲得值,內部類將它傳遞給通知觀察者方法。
這是行不通的。它有點讓我擔心有一些主要的類會在任務隊列上運行一段時間(true),並且內部類也循環等待池以接收來自WorkerThread的結果?
這是一個示例實現。你怎麼看?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}