2009-06-28 37 views
1

我已經創建了一個併發的遞歸目錄遍歷和文件處理程序,它在所有並行計算完成後有時會掛起,但「主要」線程永遠不會繼續執行其他任務。併發目錄遍歷算法掛起問題

該代碼基本上是一個fork-join樣式的併發聚合器,在並行聚合完成後,它應該在Swing窗口中顯示結果。聚合的麻煩在於它需要生成一棵樹並將層次結構中葉節點的統計信息聚合起來。

我敢肯定,我犯了一個併發錯誤,但找不到它。我在帖子末尾添加了代碼的相關部分(爲簡潔起見代碼註釋被刪除,對於150行代碼感到抱歉,如果需要,我可以將它移到外部位置)。

上下文:Java 6u13,Windows XP SP3,Core 2 Duo CPU。

我的問題是:

可能是什麼原因這個隨機掛?

有沒有更好的方法來做併發目錄遍歷,也許在一個已經存在的庫的形式?

Doug lea(或Java 7)的fork-join框架是聚合/目錄遍歷的更好框架嗎?如果是的話,我應該如何重新考慮我的實現 - 在概念級別?

謝謝你的時間。

和代碼摘錄:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException { 
    CountUpDown count = new CountUpDown(); 
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors 
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors()); 

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length]; 
    for (int i = 0; i < jfes.length; i++) { 
     count.increment(); 
     jfes[i] = new JavaFileEvaluator(files[i], count, ex); 
     ex.execute(jfes[i]); 
    } 
    count.await(); 
    for (int i = 0; i < jfes.length; i++) { 
     count.increment(); 
     final JavaFileEvaluator jfe = jfes[i]; 
     ex.execute(new Runnable() { 
      public void run() { 
       jfe.aggregate(); 
      } 
     }); 

    } 
    // ------------------------------------- 
    // this await sometimes fails to wake up 
    count.await(); // <--------------------- 
    // ------------------------------------- 
    ex.shutdown(); 
    ex.awaitTermination(0, TimeUnit.MILLISECONDS); 
    return jfes; 
} 
public class JavaFileEvaluator implements Runnable { 
    private final File srcFile; 
    private final Counters counters = new Counters(); 
    private final CountUpDown count; 
    private final ExecutorService service; 
    private List<JavaFileEvaluator> children; 
    public JavaFileEvaluator(File srcFile, 
      CountUpDown count, ExecutorService service) { 
     this.srcFile = srcFile; 
     this.count = count; 
     this.service = service; 
    } 
    public void run() { 
     try { 
      if (srcFile.isFile()) { 
       JavaSourceFactory jsf = new JavaSourceFactory(); 
       JavaParser jp = new JavaParser(jsf); 
       try { 
        counters.add(Constants.FILE_SIZE, srcFile.length()); 
        countLines(); 
        jp.parse(srcFile); 
        Iterator<?> it = jsf.getJavaSources(); 
        while (it.hasNext()) { 
         JavaSource js = (JavaSource)it.next(); 
         js.toString(); 
         processSource(js); 
        } 
       // Some catch clauses here 
       } 
      } else 
      if (srcFile.isDirectory()) { 
       processDirectory(srcFile); 
      } 
     } finally { 
      count.decrement(); 
     } 
    } 
    public void processSource(JavaSource js) { 
     // process source, left out for brevity 
    } 
    public void processDirectory(File dir) { 
     File[] files = dir.listFiles(new FileFilter() { 
      @Override 
      public boolean accept(File pathname) { 
       return 
       (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
       && !pathname.getName().startsWith(".")) 
       || (pathname.isFile() && pathname.getName().endsWith(".java") 
       && pathname.canRead()); 
      } 
     }); 
     if (files != null) { 
      Arrays.sort(files, new Comparator<File>() { 
       @Override 
       public int compare(File o1, File o2) { 
        if (o1.isDirectory() && o2.isFile()) { 
         return -1; 
        } else 
        if (o1.isFile() && o2.isDirectory()) { 
         return 1; 
        } 
        return o1.getName().compareTo(o2.getName()); 
       } 
      }); 
      for (File f : files) { 
       if (f.isFile()) { 
        counters.add(Constants.FILE, 1); 
       } else { 
        counters.add(Constants.DIR, 1); 
       } 
       JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service); 
       if (children == null) { 
        children = new ArrayList<JavaFileEvaluator>(); 
       } 
       children.add(ev); 
       count.increment(); 
       service.execute(ev); 
      } 
     } 
    } 
    public Counters getCounters() { 
     return counters; 
    } 
    public boolean hasChildren() { 
     return children != null && children.size() > 0; 
    } 
    public void aggregate() { 
     // recursively aggregate non-leaf nodes 
     if (!hasChildren()) { 
      count.decrement(); 
      return; 
     } 
     for (final JavaFileEvaluator e : children) { 
      count.increment(); 
      service.execute(new Runnable() { 
       @Override 
       public void run() { 
        e.aggregate(); 
       } 
      }); 
     } 
     count.decrement(); 
    } 
} 
public class CountUpDown { 
    private final Lock lock = new ReentrantLock(); 
    private final Condition cond = lock.newCondition(); 
    private final AtomicInteger count = new AtomicInteger(); 
    public void increment() { 
     count.incrementAndGet(); 
    } 
    public void decrement() { 
     int value = count.decrementAndGet(); 
     if (value == 0) { 
      lock.lock(); 
      try { 
       cond.signalAll(); 
      } finally { 
       lock.unlock(); 
      } 
     } else 
     if (value < 0) { 
      throw new IllegalStateException("Counter < 0 :" + value); 
     } 
    } 
    public void await() throws InterruptedException { 
     lock.lock(); 
     try { 
      if (count.get() > 0) { 
       cond.await(); 
      } 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 

編輯添加了hasChildren()中JavaSourceEvaluator方法。

回答

1

在JavaFileEvaluator的聚合方法中,count.decrement()不在finally塊中調用。如果在聚合函數內部引發任何RuntimeException(可能在hasChildren方法中,我沒有看到它的正文?),調用遞減將永遠不會發生,並且CountUpDown將保持無限期等待。這可能是你看到的隨機死亡的原因。

對於第二個問題,我不知道java中的任何庫是否這樣做,但我沒有真正看過,對於非答案抱歉,但這不是我有任何機會之前使用。至於第三個問題,我認爲不管你是使用其他人提供的fork-join框架,還是繼續提供你自己的併發框架,最大的收穫是分離遍歷工作的邏輯來自管理並行性的邏輯目錄。您提供的代碼使用CountUpDown類來跟蹤所有線程何時完成,並且最終調用遍歷處理目錄遍歷的方法的遞增/遞減,這將導致惡夢追蹤錯誤。轉移到java7 fork-join框架將迫使你創建一個只處理實際遍歷邏輯的類,並將併發邏輯放到框架中,這對你來說可能是一個好方法。另一種選擇是繼續使用你在這裏的內容,但在管理邏輯和工作邏輯之間做一個清晰的描述,這將幫助你追蹤和修復這些類型的錯誤。

+0

哇,你是對的!我加了失蹤的孩子。我也會把這個包裹放到try-finally中,並且會在循環中運行一段代碼,謝謝! +1。你能否反映第二和第三個問題? – akarnokd 2009-06-29 18:59:58