2015-11-07 142 views
-1
public static void getTestData() { 

     try { 

      filename = "InventoryData_" + form_id; 

      PrintWriter writer = new PrintWriter("/Users/pnroy/Documents/" +filename + ".txt"); 
      pids = new ArrayList<ProductId>(); 
      GetData productList = new GetData(); 
      System.out.println("Getting productId"); 
      pids = productList.GetProductIds(form_id); 
      int perThreadSize = pids.size()/numberOfCrawlers; 
      ArrayList<ArrayList<ProductId>> perThreadData = new  
      ArrayList<ArrayList<ProductId>>(numberOfCrawlers); 
      for (int i = 1; i <= numberOfCrawlers; i++) { 
       perThreadData.add(new ArrayList<ProductId>(perThreadSize)); 
       for (int j = 0; j < perThreadSize; j++) { 
        ProductId ids = new ProductId(); 
        ids.setEbProductID((pids.get(((i - 1) * perThreadSize + j))).getEbProductID()); 
        ids.setECProductID((pids.get(((i - 1) * perThreadSize + j))).getECProductID()); 
        perThreadData.get(i - 1).add(ids); 
       } 
      } 

      BlockingQueue<String> q = new LinkedBlockingQueue<String>(); 
      Consumer c1 = new Consumer(q); 
      Thread[] thread = new Thread[numberOfCrawlers]; 
      for (int k = 0; k <= numberOfCrawlers; k++) { 
       // System.out.println(k); 
       GetCombinedData data = new GetCombinedData(); 
       thread[k] = new Thread(data); 
       thread[k].setDaemon(true); 
       data.setVal(perThreadData.get(k), filename, q); 
       thread[k].start(); 

       // writer.println(data.getResult()); 
      } 
      new Thread(c1).start(); 
      for (int l = 0; l <= numberOfCrawlers; l++) { 
       thread[l].join(); 

      } 
     } catch (Exception e) { 
     } 
    } 

這裏爬網程序的數量是線程的數量。java中有多線程的多線程

GetCombined類的運行方法具有以下的代碼: 的PID作爲來自 類CassController查詢一個API的主要方法perThreadData.get(K-1)通過,我得到一些處理後的字符串結果。

public void run(){ 
     try{ 

     for(int i=0;i<pids.size();i++){ 
      //System.out.println("before cassini"); 
     CassController cass = new CassController(); 
     String result=cass.getPaginationDetails(pids.get(i)); 
     queue.put(result); 
     // System.out.println(result); 
     Thread.sleep(1000); 
      } 
     writer.close(); 
     }catch(Exception ex){ 

     } 

Consumer.java具有下面的代碼:

public class Consumer implements Runnable{ 
    private final BlockingQueue queue; 
    Consumer(BlockingQueue q) { queue = q; } 
    public void run(){ 
     try { 
       while (queue.size()>0) 
       { 
        consume(queue.take()); 
       } 
      } catch (InterruptedException ex) 
       { 
       } 

    } 
    void consume(Object x) { 
     try{ 
     PrintWriter writer = new PrintWriter(new FileWriter("/Users/pnroy/Documents/Inventory", true)); 
     writer.println(x.toString()); 
     writer.close(); 
     }catch(IOException ex){ 

     } 

    } 

所以,如果我設置爬蟲的數量爲10,如果有500個記錄每個線程將處理50個records.I需要編寫結果成一個文件。我很困惑,我可以做到這一點,因爲它的線程數組和每個線程正在做一堆操作。

我試過使用阻塞隊列,但那是打印重複結果。我是新的多線程,不知道如何處理這種情況。 你可以請建議。

+1

您是否必須爲此使用一個線程數組?如果你不這樣做,請使用'ExecutorService'來代替,你會發現生活變得如此簡單。 – biziclop

+1

爲什麼你的程序會在稍後要加入join的線程上調用'setDaemon(true)'? –

+0

它可能會幫助其他程序員理解你的代碼,如果你要爲你的類和變量使用名詞式名稱,以及爲你的方法使用動詞式名稱。方法_do_東西,物件_are_東西。 –

回答

-1

隨着許多有用的高級併發類的引入,現在推薦不再直接使用Thread類。即使是BlockingQueue類也是相當低級的。

取而代之,您有一個不錯的應用程序CompletionService,它建立在ExecutorService的基礎上。以下示例顯示如何使用它。

您想要替換PartialResultTask(即主要處理髮生的位置)和System.out.println(這是您可能希望將結果寫入文件的位置)中的代碼。

public class ParallelProcessing { 

    public static void main(String[] args) { 
     ExecutorService executionService = Executors.newFixedThreadPool(10); 
     CompletionService<String> completionService = new ExecutorCompletionService<>(executionService); 

     // submit tasks 
     for (int i = 0; i < 500; i++) { 
      completionService.submit(new PartialResultTask(i)); 
     } 

     // collect result 
     for (int i = 0; i < 500; i++) { 
      String result = getNextResult(completionService); 
      if (result != null) 
       System.out.println(result); 
     } 

     executionService.shutdown(); 
    } 

    private static String getNextResult(CompletionService<String> completionService) { 

     Future<String> result = null; 
     while (result == null) { 
      try { 
       result = completionService.take(); 
      } catch (InterruptedException e) { 
       // ignore and retry 
      } 
     } 

     try { 
      return result.get(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
      return null; 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 


    static class PartialResultTask implements Callable<String> { 

     private int n; 

     public PartialResultTask(int n) { 
      this.n = n; 
     } 

     @Override 
     public String call() { 
      return String.format("Partial result %d", n); 
     } 
    } 
}