2013-10-25 97 views
1

我沒有太多的經驗,使多線程應用程序,但我覺得我的計劃是在一個點,它可以由具有多個線程中受益。我正在做一個更大規模的項目,涉及使用分類器(如機器學習中)對大約32000個客戶進行分類。我調試了該程序,發現需要大約一秒來對每個用戶進行分類。換句話說,這需要8.8小時才能完成!在Java中使用多個線程來縮短項目時間

有沒有什麼辦法可以運行4個線程,每個線程處理8000個用戶?第一個線程將處理1-8000,第二個8001-16000,第三個16001-23000,第四個23001-32000。此外,截至目前每個分類是通過調用另一個類的靜態函數...

然後當除主之一,其他線程應該結束完成。這是可行的嗎?如果是這樣,我將不勝感激,如果有人可以提供關於如何做到這一點的提示或步驟。我很熟悉關鍵部分的想法(等待/信號),但幾乎沒有經驗。

再次,任何幫助將非常感激!有關如何處理這種情況的提示和建議值得歡迎!不知道它的問題,但我有一個2.53 GHZ處理器速度的Core 2 Duo PC。

+0

通過實現上述內容,您不會減少計算時間。 – Kon

+0

如果您的工作完全受CPU限制,則您的有效並行性級別將嚴格限制爲CPU內核數量。 – SLaks

+2

確保任何共享狀態都是線程安全的。更好的是,擺脫任何共享狀態。 – SLaks

回答

2

這是Apache Hadoop的,這需要大約每個服務器數據的64MB塊太輕...但..它是阿卡演員的絕佳機會,而且,它恰好支持Java!

http://doc.akka.io/docs/akka/2.1.4/java/untyped-actors.html

基本上,你可以有4個演員做的工作,併爲他們完成分類的用戶,或可能會更好,用戶數量,他們要麼將它傳遞給一個「接收器」的演員,這使將信息導入到數據結構或輸出文件中,或者,通過每次寫入文件來執行並行I/O ..然後可以在完成所有文件時檢查/組合文件。

如果你想獲得更看中/強大的,你可以把遠程服務器上的演員。與他們溝通仍然非常容易,並且您將利用多臺服務器的CPU /資源。

我寫了一篇自己的阿卡演員,但它在斯卡拉,所以我就饒你。但是,如果你是谷歌「akka演員」,你會得到很多關於如何使用它的手持示例。勇敢一點,立即潛入並試驗。 「演員系統」是一個非常簡單的概念。我知道你可以做到這一點!

+0

哇,這聽起來非常神奇!我之前從數據庫類中聽說過Hadoop,但是Akka演員聽起來很有前途。我會深入探討這一點。我相信我會在這個過程中學到很多有用的東西。再次感謝您和其他所有回覆! – Tastybrownies

1

將數據拆分爲實現Runnable的對象,然後將它們傳遞給新線程。

在這種情況下,有四個以上的線程不會消滅你,但你不能獲得比核心更多的並行工作(如註釋中提到的那樣) - 如果線程多於核心,系統將不得不處理誰去的時候。

如果我有一類客戶,我想發出一個線程來8000個客戶放在首要更大集合我可能會做這樣的事情:

public class CustomerClassifier implements Runnable { 

    private customer[] customers; 

    public CustomerClassifier(customer[] customers) { 
    this.customers = customers; 
    } 
    @Override 
    public void run() { 
    for (int i=0; i< customers.length; i++) { 
     classify(customer);//critical that this classify function does not 
         //attempt to modify a resource outside this class 
         //unless it handles locking, or is talking to a database 
         //or something that won't throw fits about resource locking 
    } 
    } 
} 

然後發出這些線程別處

int jobSize = 8000; 
customer[] customers = new customer[jobSize](); 
int j = 0; 
for (int i =0; i+j< fullCustomerArray.length; i++) { 
    if (i == jobSize-1) { 
    new Thread(new CustomerClassifier(customers)).start();//run will be invoked by thread 
    customers = new Customer[jobSize](); 
    j += i; 
    i = 0; 
    } 
    customers[i] = fullCustomerArray[i+j]; 
} 

如果你有你的分類方法會影響同一資源的地方,你將不得不 實現鎖定,也將殺死獲得了一定程度的優勢。

併發是非常複雜的,需要大量的心思,我也建議看oracle的文檔http://docs.oracle.com/javase/tutorial/essential/concurrency/index.html (我知道鏈接是壞的,但希望在Oracle文檔不走動太多?)

免責聲明:我不是併發設計或多線程(不同主題)的專家。

+0

非常感謝您花時間寫出來的時間。我會牢記你的想法! – Tastybrownies

1

如果將輸入數組拆分爲4個相等的4個線程的子陣列,則不能保證所有線程同時完成。您最好將所有數據放在一個隊列中,讓所有工作線程從該通用隊列中提供。使用安全的BlockingQueue實現爲了不寫低級同步/等待/通知代碼。

+0

好點,我應該提到,他們這樣劃分他們形成了關於秩序無關的假設,並且在任務完成時並沒有跟蹤任何事情。 – Catalyst

0

從java 6我們有一些方便的併發使用。您可能需要考慮使用線程池來實現更清晰的實現。

package com.threads; 

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ParalleliseArrayConsumption { 

    private int[] itemsToBeProcessed ; 

    public ParalleliseArrayConsumption(int size){ 
     itemsToBeProcessed = new int[size]; 
    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     (new ParalleliseArrayConsumption(32)).processUsers(4); 

    } 

    public void processUsers(int numOfWorkerThreads){ 
     ExecutorService threadPool = Executors.newFixedThreadPool(numOfWorkerThreads); 
     int chunk = itemsToBeProcessed.length/numOfWorkerThreads; 
     int start = 0; 
     List<Future> tasks = new ArrayList<Future>(); 
     for(int i=0;i<numOfWorkerThreads;i++){ 
      tasks.add(threadPool.submit(new WorkerThread(start, start+chunk))); 
      start = start+chunk; 
     } 
      // join all worker threads to main thread 
     for(Future f:tasks){ 
      try { 
       f.get(); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     threadPool.shutdown(); 
     while(!threadPool.isTerminated()){ 
     } 

    } 

    private class WorkerThread implements Callable{ 

     private int startIndex; 
     private int endIndex; 

     public WorkerThread(int startIndex, int endIndex){ 
      this.startIndex = startIndex; 
      this.endIndex = endIndex; 
     } 

     @Override 
     public Object call() throws Exception { 
      for(int currentUserIndex = startIndex;currentUserIndex<endIndex;currentUserIndex++){ 
       // process the user. Add your logic here 
       System.out.println(currentUserIndex+" is the user being processed in thread " +Thread.currentThread().getName()); 
      } 
      return null; 
     }  

    } 

}