2016-05-15 436 views
6

我目前正在用C++編寫素數生成器。我先做了一個單線程版本,後來是一個多線程版本。C++線程應用程序比非線程運行速度慢

我發現如果我的程序生成的值小於100'000,那麼單線程版本比多線程版本要快。顯然我做錯了什麼。

我下面的代碼:

#include <iostream> 
#include <fstream> 
#include <set> 
#include <string> 
#include <thread> 
#include <mutex> 
#include <shared_mutex> 

using namespace std; 

set<unsigned long long> primeContainer; 
shared_mutex m; 

void checkPrime(const unsigned long long p) 
{ 
    if (p % 3 == 0) 
     return; 

    bool isPrime = true; 
    for (set<unsigned long long>::const_iterator it = primeContainer.cbegin(); it != primeContainer.cend(); ++it) 
    { 
     if (p % *it == 0) 
     { 
      isPrime = false; 
      break; 
     } 
     if (*it * *it > p) // check only up to square root 
      break; 
    } 

    if (isPrime) 
     primeContainer.insert(p); 
} 

void checkPrimeLock(const unsigned long long p) 
{ 
    if (p % 3 == 0) 
     return; 

    bool isPrime = true; 
    try 
    { 
     shared_lock<shared_mutex> l(m); 
     for (set<unsigned long long>::const_iterator it = primeContainer.cbegin(); it != primeContainer.cend(); ++it) 
     { 
      if (p % *it == 0) 
      { 
       isPrime = false; 
       break; 
      } 
      if (*it * *it > p) 
       break; 
     } 
    } 
    catch (exception& e) 
    { 
     cout << e.what() << endl; 
     system("pause"); 
    } 

    if (isPrime) 
    { 
     try 
     { 
      unique_lock<shared_mutex> l(m); 
      primeContainer.insert(p); 
     } 
     catch (exception& e) 
     { 
      cout << e.what() << endl; 
      system("pause"); 
     } 
    } 
} 

void runLoopThread(const unsigned long long& l) 
{ 
    for (unsigned long long i = 10; i < l; i += 10) 
    { 
     thread t1(checkPrimeLock, i + 1); 
     thread t2(checkPrimeLock, i + 3); 
     thread t3(checkPrimeLock, i + 7); 
     thread t4(checkPrimeLock, i + 9); 
     t1.join(); 
     t2.join(); 
     t3.join(); 
     t4.join(); 
    } 
} 

void runLoop(const unsigned long long& l) 
{ 
    for (unsigned long long i = 10; i < l; i += 10) 
    { 
     checkPrime(i + 1); 
     checkPrime(i + 3); 
     checkPrime(i + 7); 
     checkPrime(i + 9); 
    } 
} 

void printPrimes(const unsigned long long& l) 
{ 
    if (1U <= l) 
     cout << "1 "; 
    if (2U <= l) 
     cout << "2 "; 
    if (3U <= l) 
     cout << "3 "; 
    if (5U <= l) 
     cout << "5 "; 

    for (auto it = primeContainer.cbegin(); it != primeContainer.cend(); ++it) 
    { 
     if (*it <= l) 
      cout << *it << " "; 
    } 
    cout << endl; 
} 

void writeToFile(const unsigned long long& l) 
{ 
    string name = "primes_" + to_string(l) + ".txt"; 
    ofstream f(name); 

    if (f.is_open()) 
    { 
     if (1U <= l) 
      f << "1 "; 
     if (2U <= l) 
      f << "2 "; 
     if (3U <= l) 
      f << "3 "; 
     if (5U <= l) 
      f << "5 "; 

     for (auto it = primeContainer.cbegin(); it != primeContainer.cend(); ++it) 
     { 
      if (*it <= l) 
       f << *it << " "; 
     } 
    } 
    else 
    { 
     cout << "Error opening file." << endl; 
     system("pause"); 
    } 
} 

int main() 
{ 
    unsigned int n = thread::hardware_concurrency(); 
    std::cout << n << " concurrent threads are supported." << endl; 

    unsigned long long limit; 
    cout << "Please enter the limit of prime generation: "; 
    cin >> limit; 

    primeContainer.insert(7); 

    if (10 < limit) 
    { 
     //runLoop(limit); //single-threaded 
     runLoopThread(limit); //multi-threaded 
    } 

    printPrimes(limit); 
    //writeToFile(limit); 
    system("pause"); 
    return 0; 
} 

main功能,你會發現評論有關其功能單一和多線程。

這些之間的主要區別是使用鎖,共享容器迭代和獨特的插入。如果重要,我的CPU有4個內核。

爲什麼單線程版本更快?

+10

多線程永遠不會保證速度更快,而且速度通常更慢,特別是在有鎖的情況下,如果您不小心,可以將代碼恢復爲同步行爲。 – johnbakers

+3

我認爲這是由於創建線程的開銷。啓動每個線程並將其關閉的時間比通過並行計算節省的時間要大得多。與其創建一個線程來檢查單個數字,您可以使同一個線程檢查以1或3等結尾的每個數字。但是,在這種情況下,您必須調整素數測試以處理線程以不同的價格行事。 –

回答

4

對我來說,似乎你開始一個新的線程爲每個單一的素數檢查。這是不好的恕我直言,因爲線程啓動/關閉加上同步增加了每個素數的計算。開始一個線程可能會很慢。

我建議在主要for循環之外啓動這4個線程,並在每個線程中處理1/4的範圍。但是這可能需要一些額外的同步,因爲爲了檢查素數,上面的代碼顯然需要首先有sqrt N可用的素數。

從我的角度來看,使用Sieve of Erastothenes算法可能會更容易,這可能會更容易並行化而沒有任何鎖定(但可能仍會遇到稱爲「false sharing」的問題)。

編輯

在這裏,我快速創建使用Erastothenes的篩版本:高達1 000 000(MSVC 2013年發佈)

void processSieve(const unsigned long long& l, 
    const unsigned long long& start, 
    const unsigned long long& end, 
    const unsigned long long& step, 
    vector<char> &is_prime) 
{ 
    for (unsigned long long i = start; i <= end; i += step) 
     if (is_prime[i]) 
      for (unsigned long long j = i + i; j <= l; j += i) 
       is_prime[j] = 0; 
} 

void runSieve(const unsigned long long& l) 
{ 
    vector<char> is_prime(l + 1, 1); 
    unsigned long long end = sqrt(l); 
    processSieve(l, 2, end, 1, is_prime); 
    primeContainer.clear(); 
    for (unsigned long long i = 1; i <= l; ++i) 
     if (is_prime[i]) 
      primeContainer.insert(i); 
} 

void runSieveThreads(const unsigned long long& l) 
{ 
    vector<char> is_prime(l + 1, 1); 
    unsigned long long end = sqrt(l); 
    vector<thread> threads; 
    threads.reserve(cpuCount); 
    for (unsigned long long i = 0; i < cpuCount; ++i) 
     threads.emplace_back(processSieve, l, 2 + i, end, cpuCount, ref(is_prime)); 
    for (unsigned long long i = 0; i < cpuCount; ++i) 
     threads[i].join(); 
    primeContainer.clear(); 
    for (unsigned long long i = 1; i <= l; ++i) 
     if (is_prime[i]) 
      primeContainer.insert(i); 
} 

測量結果,素數:

runLoop: 204.02 ms 
runLoopThread: 43947.4 ms 
runSieve: 30.003 ms 
runSieveThreads (8 cores): 24.0024 ms 

高達10 0000 000:

runLoop: 4387.44 ms 
// runLoopThread disabled, taking too long 
runSieve: 350.035 ms 
runSieveThreads (8 cores): 285.029 ms 

時間包括向量的最終處理並將結果推送到素數集。

正如您所看到的,即使在單線程版本中,Sieve版本比您的版本快得多(對於您的互斥體版本,我必須將鎖更改爲常規互斥鎖,因爲MSVC 2013沒有shared_lock,所以結果可能比你更糟糕)。

但你可以看到多線程版本的篩仍然沒有像預期的那樣運行得很快(8個內核,即8個線程,線性加速比單線程快8倍),雖然沒有鎖定(交易關閉如果某些數字未被其他線程標記爲「無素數」,那麼某些數字可能會不必要地運行,但通常結果應該穩定,因爲每次只設置爲0,如果由多個線程同時設置則無關緊要。加速不是線性的原因很可能是因爲我之前提到的「false sharing」問題 - 寫入零的線程使每個其他緩存線無效。

11

你有幾個問題。

首先,你不斷地創建和銷燬線程。讓每個線程循環工作,直到沒有其他工作要做。

其次,你的鎖是方式太細,因此,你獲取這些方式過於頻繁。讓每個線程抓取一個包含100個數字的塊來測試,而不是每次測試一個,並讓它們一次插入每個塊中找到的素數。

+1

+1你可以完全避免鎖定,讓你的線程檢查不相交的數字集,並且每個寫入他們自己的容器。線程完成後,簡單地連接所有容器。 – denniskb

+2

這不像「線程讓事情變得更快」這麼簡單。使用線程可以使許多事情變得更快,但要做到這一點並不是很容易,以獲得顯着的性能提升。具有諷刺意味的是,這樣的「玩具」代碼比在實際代碼中困難得多。 –

+0

感謝您的建議,我做了一些改進:每個線程現在分別處理以1,3,7和9結尾的值。這解決了你的第一點。對於你的第二點,我仍然在想辦法。我喜歡@denniskb的建議,因爲我現在從未到達容器的末端,只有前半部分。我可以爲每個線程保留一個本地集合,將新找到的素數插入這些本地集合中,當我在某個線程中到達主集合的末尾時,發信號通知線程將其本地集合轉儲到主集合中。雖然,這聽起來像更多的鎖使用。 Dunno:/ –

2

由於註釋部分開始有點擁擠和OP表示在無鎖的解決方案感興趣,我提供以下這種方法的一個例子(半僞代碼):

vector<uint64_t> primes_thread1; 
vector<uint64_t> primes_thread2; 
... 

// check all numbers in [start, end) 
void check_primes(uint64_t start, uint64_t end, vector<uint64_t> & out) { 
    for (auto i = start; i < end; ++i) { 
     if (is_prime(i)) { // simply loop through all odds from 3 to sqrt(i) 
      out.push_back(i); 
     } 
    } 
} 

auto f1 = async(check_primes, 1, 1000'000, ref(primes_thread1)); 
auto f2 = async(check_primes, 1000'000, 2000'000, ref(primes_thread2)); 
... 

f1.wait(); 
f2.wait(); 
... 

primes_thread1.insert(
    primes_thread1.begin(), 
    primes_thread2.cbegin(), primes_thread2.cend() 
); 
primes_thread1.insert(
    primes_thread1.begin(), 
    primes_thread3.cbegin(), primes_thread3.cend() 
); 
... 
// primes_thread1 contains all primes found in all threads 

顯然,這可以通過參數化線程的數量和每個範圍的大小來很好地重構。我希望能夠更清楚地說明避免鎖定的概念,而不是首先分享任何狀態。

2

你的素數測試可能還有另一個問題。你從不測試7作爲除數。

此外,您的測試假定primeContainer已經包含了10和被測數字的sqare根之間的所有素數。如果使用線程填充容器,情況可能不是這樣。

如果您隨時添加數字(並且您的算法依賴於該數字)來填充容器,則可以使用std :: vector而不是std :: set來獲得更好的性能。

相關問題