我有一個LinkedBlockingQueue任意挑選的容量爲10,並有1000行輸入文件。據我所知,我在服務類的main
方法中有一個ExecutorService
類型變量,它使用Executors.newSingleThreadExecutor()
- 一個單獨的線程調用buffer.readline(),直到文件line == null
,然後處理 - 在循環使用Executors.newSingleThreadExecutor()
- 經常線程處理行並將它們寫入輸出文件,直到!queue.take().equals("Stop")
。但是,在寫入一些行到文件後,當我處於調試模式時,我看到隊列的容量最終達到最大值(10),並且處理線程不執行queue.take()
。所有線程都處於running
狀態,但進程在queue.put()
後停止。什麼會導致這個問題,並且是否可以使用線程池或多個處理器變量的組合來解決,而不是使用單個變量?One Producer十個消費者文件處理與Executors.newSingleThreadExecutor()
綱要在服務main
方法的當前狀態:
//app settings to get values for keys within a properties file
AppSettings appSettings = new AppSettings();
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
maxProdThreads = 1;
maxConsThreads = 10;
ExecutorService execSvc = null;
for (int i = 0; i < maxProdThreads; i++) {
execSvc = Executors.newSingleThreadExecutor();
execSvc.submit(new ReadJSONMessage(appSettings,queue));
}
for (int i = 0; i < maxConsThreads; i++) {
execSvc = Executors.newSingleThreadExecutor();
execSvc.submit(new ProcessJSONMessage(appSettings,queue));
}
讀取方法的代碼:
buffer = new BufferedReader(new FileReader(inputFilePath));
while((line = buffer.readLine()) != null){
line = line.trim();
queue.put(line);
}
處理和寫代碼:
while(!(line=queue.take()).equals("Stop")){
if(line.length() > 10)
{
try {
if(processMessage(line, outputFilePath) == true)
{
++count;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public boolean processMessage(String line, String outputFilePath){
CustomObject cO = new CustomObject();
cO.setText(line);
writeToFile1(cO,...);
writeToFile2(cO,...);
}
public void writeOutputAToFile(CustomObject cO,...){
synchronized(cO){
...
org.apache.commons.io.FileUtils.writeStringToFile(...)
}
}
public void writeOutputBToFile(CustomObject cO,...){
synchronized(cO){
...
org.apache.commons.io.FileUtils.writeStringToFile(...)
}
}
請張貼您的讀寫代碼。我感覺到你以某種方式阻擋在那裏。 –
不是你的答案,但我建議在你的循環外部放置一個'Executors.newCachedThreadPool()'並使用它。這將允許您在完成後使用一次調用('exeSvc.shutdown()')關閉所有線程。 – teppic
進行線程轉儲以查看線程被阻塞的位置。 – teppic