2013-10-08 84 views
3

以下代碼在兩個std::vectorsv1v2上運行,每個包含多個128元素的向量。通過外部載體循環(使用i1i2)包含一個內部循環,旨在限制執行進一步複雜處理的i1i2的組合。大約99.9%的組合被過濾掉。我應該如何改善這個C++代碼的性能?

不幸的是,過濾循環是我的程序中的一個主要瓶頸 - 分析表明,整個運行時間的26%花費在行if(a[k] + b[k] > LIMIT)上。

const vector<vector<uint16_t>> & v1 = ... 
const vector<vector<uint16_t>> & v2 = ... 

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000 
    for(size_t i2 = 0; i2 < v2.size(); ++i2) { 

     const vector<uint16_t> & a = v1[i1]; 
     const vector<uint16_t> & b = v2[i2]; 

     bool good = true; 
     for(std::size_t k = 0; k < 128; ++k) { 
      if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000 
       good = false; 
       break; 
      } 
     } 
     if(!good) continue; 

     // Further processing involving i1 and i2 
    } 
} 

我認爲這個代碼的性能可以通過增加內存的局部性,加上可能的矢量化來改善。有關如何做到這一點的建議,或有關可以進行的其他改進的建議?

+0

應該可能轉到codereview,而不是(他們應該把那已經進入offtopic近票的清單) – PlasmaHH

+0

聽起來像分支預測問題。 sse會解決它 –

+0

數據結構可以簡化嗎? –

回答

0

首先,一個簡單的優化可以是這樣,但是編譯器可以做到這一點本身,所以我不知道它有多少能改善:

for(std::size_t k = 0; k < 128 && good; ++k) 
{ 
    good = a[k] + b[k] <= LIMIT; 
} 

其次,我認爲它能夠更好地保持第二個向量中的結果很好,因爲涉及i1和i2的任何 處理都可能會破壞CPU高速緩存。第三,這可能是主要優化,我認爲你可以重寫第二個循環爲: for(size_t i2 = i1; i2 < v2.size(); ++ i2)因爲您使用的是+對於交換的a和b向量的運算使得i1和i2的結果將與i2和i1相同。 爲此,您需要使v1和v2的大小相同。如果大小不同,則需要以不同的方式編寫迭代。

Fort,就我所見,你正在處理兩個矩陣,最好保留一個元素向量而不是向量向量。

希望這會有所幫助。 Razvan。

+0

'good&='否則你只要檢查最後一個'k'。 –

3

您可以應用SIMD到內環:

bool good = true; 
    for(std::size_t k = 0; k < 128; ++k) { 
     if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000 
      good = false; 
      break; 
     } 

如下:

#include <emmintrin.h> // SSE2 intrinsics 
#include <limits.h>  // SHRT_MIN 

// ... 

    // some useful constants - declare these somewhere before the outermost loop 

    const __m128i vLIMIT = _mm_set1_epi16(LIMIT + SHRT_MIN); // signed version of LIMIT 
    const __m128i vOFFSET = _mm_set1_epi16(SHRT_MIN);  // offset for uint16_t -> int16_t conversion 

// ... 

    bool good = true; 
    for(std::size_t k = 0; k < 128; k += 8) { 
     __m128i v, va, vb;    // iterate through a, b, 8 elements at a time 
     int mask; 
     va = _mm_loadu_si128(&a[k]); // get 8 elements from a[k], b[k] 
     vb = _mm_loadu_si128(&b[k]); 
     v = _mm_add_epi16(va, vb);  // add a and b vectors 
     v = _mm_add_epi16(v, vOFFSET); // subtract 32768 to make signed 
     v = _mm_cmpgt_epi16(v, vLIMIT); // compare against LIMIT 
     mask = _mm_maskmove_epi8(v); // get comparison results as 16 bit mask 
     if (mask != 0) {    // if any value exceeded limit 
      good = false;    // clear good flag and exit loop 
      break; 
     } 

警告:未經測試的代碼 - 可能需要調試,但一般的做法應該是聲。

+0

看起來不錯。爲了進一步改進,我會對齊數據 –

+0

@BЈовић:是的,很遺憾,您對std :: vector的對齊方式沒有太多控制。幸運的是,雖然在現代CPU上錯位加載並不太昂貴。 –

0

幾點建議:

  • 如所建議的評價,具有更好的存儲器局部性的陣列替換內部128元素矢量。
  • 此代碼看起來高度可並行化,你試過嗎?您可以拆分組合進行跨所有可用內核的篩選,然後重新平衡所收集的工作並將處理拆分到所有內核之間。

我實現了一個內部128個元素使用數組的版本,PPL進行並行化(需要VS 2012或更高版本)和一些SSE代碼進行過濾,並獲得了相當顯着的加速。取決於「進一步處理」究竟涉及什麼,可能會有稍微不同的結構化好處(在本例中,我不會在過濾後重新平衡工作)。

更新:我實現了Ben Voigt建議的緩存阻塞,並且獲得了更多的加速。

#include <vector> 
#include <array> 
#include <random> 
#include <limits> 
#include <cstdint> 
#include <iostream> 
#include <numeric> 
#include <chrono> 
#include <iterator> 

#include <ppl.h> 

#include <immintrin.h> 

using namespace std; 
using namespace concurrency; 

namespace { 
const int outerVecSize = 20000; 
const int innerVecSize = 128; 
const int LIMIT = 16000; 
auto engine = default_random_engine(); 
}; 

typedef vector<uint16_t> InnerVec; 
typedef array<uint16_t, innerVecSize> InnerArr; 

template <typename Cont> void randomFill(Cont& c) { 
    // We want approx 0.1% to pass filter, the mean and standard deviation are chosen to get close to that 
    static auto dist = normal_distribution<>(LIMIT/4.0, LIMIT/4.6); 
    generate(begin(c), end(c), [] { 
     auto clamp = [](double x, double minimum, double maximum) { return min(max(minimum, x), maximum); }; 
     return static_cast<uint16_t>(clamp(dist(engine), 0.0, numeric_limits<uint16_t>::max())); 
    }); 
} 

void resizeInner(InnerVec& v) { v.resize(innerVecSize); } 
void resizeInner(InnerArr& a) {} 

template <typename Inner> Inner generateRandomInner() { 
    auto inner = Inner(); 
    resizeInner(inner); 
    randomFill(inner); 
    return inner; 
} 

template <typename Inner> vector<Inner> generateRandomInput() { 
    auto outer = vector<Inner>(outerVecSize); 
    generate(begin(outer), end(outer), generateRandomInner<Inner>); 
    return outer; 
} 

void Report(const chrono::high_resolution_clock::duration elapsed, size_t in1Size, size_t in2Size, 
      const int passedFilter, const uint32_t specialValue) { 
    cout << passedFilter << "/" << in1Size* in2Size << " (" 
     << 100.0 * (double(passedFilter)/double(in1Size * in2Size)) << "%) passed filter\n"; 
    cout << specialValue << "\n"; 
    cout << "Elapsed time = " << chrono::duration_cast<chrono::milliseconds>(elapsed).count() << "ms" << endl; 
} 

void TestOriginalVersion() { 
    cout << __FUNCTION__ << endl; 

    engine.seed(); 
    const auto v1 = generateRandomInput<InnerVec>(); 
    const auto v2 = generateRandomInput<InnerVec>(); 

    int passedFilter = 0; 
    uint32_t specialValue = 0; 

    auto startTime = chrono::high_resolution_clock::now(); 

    for (size_t i1 = 0; i1 < v1.size(); ++i1) { // v1.size() and v2.size() about 20000 
     for (size_t i2 = 0; i2 < v2.size(); ++i2) { 

      const vector<uint16_t>& a = v1[i1]; 
      const vector<uint16_t>& b = v2[i2]; 

      bool good = true; 
      for (std::size_t k = 0; k < 128; ++k) { 
       if (static_cast<int>(a[k]) + static_cast<int>(b[k]) 
        > LIMIT) { // LIMIT is a const uint16_t: approx 16000 
        good = false; 
        break; 
       } 
      } 
      if (!good) continue; 

      // Further processing involving i1 and i2 
      ++passedFilter; 
      specialValue += inner_product(begin(a), end(a), begin(b), 0); 
     } 
    } 

    auto endTime = chrono::high_resolution_clock::now(); 

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue); 
} 

bool needsProcessing(const InnerArr& a, const InnerArr& b) { 
    static_assert(sizeof(a) == sizeof(b) && (sizeof(a) % 16) == 0, "Array size must be multiple of 16 bytes."); 
    static const __m128i mmLimit = _mm_set1_epi16(LIMIT); 
    static const __m128i mmLimitPlus1 = _mm_set1_epi16(LIMIT + 1); 
    static const __m128i mmOnes = _mm_set1_epi16(-1); 

    auto to_m128i = [](const uint16_t* p) { return reinterpret_cast<const __m128i*>(p); }; 
    return equal(to_m128i(a.data()), to_m128i(a.data() + a.size()), to_m128i(b.data()), [&](const __m128i& a, const __m128i& b) { 
     // avoid overflow due to signed compare by clamping sum to LIMIT + 1 
     const __m128i clampSum = _mm_min_epu16(_mm_adds_epu16(a, b), mmLimitPlus1); 
     return _mm_test_all_zeros(_mm_cmpgt_epi16(clampSum, mmLimit), mmOnes); 
    }); 
} 

void TestArrayParallelVersion() { 
    cout << __FUNCTION__ << endl; 

    engine.seed(); 
    const auto v1 = generateRandomInput<InnerArr>(); 
    const auto v2 = generateRandomInput<InnerArr>(); 

    combinable<int> passedFilterCombinable; 
    combinable<uint32_t> specialValueCombinable; 

    auto startTime = chrono::high_resolution_clock::now(); 

    const size_t blockSize = 64; 

    parallel_for(0u, v1.size(), blockSize, [&](size_t i) { 
     for (const auto& b : v2) { 
      const auto blockBegin = begin(v1) + i; 
      const auto blockEnd = begin(v1) + min(v1.size(), i + blockSize); 
      for (auto it = blockBegin; it != blockEnd; ++it) { 
       const InnerArr& a = *it; 
       if (!needsProcessing(a, b)) 
        continue; 

       // Further processing involving a and b 
       ++passedFilterCombinable.local(); 
       specialValueCombinable.local() += inner_product(begin(a), end(a), begin(b), 0); 
      } 
     } 
    }); 

    auto passedFilter = passedFilterCombinable.combine(plus<int>()); 
    auto specialValue = specialValueCombinable.combine(plus<uint32_t>()); 

    auto endTime = chrono::high_resolution_clock::now(); 

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue); 
} 

int main() { 
    TestOriginalVersion(); 
    TestArrayParallelVersion(); 
} 

在我8核心系統我看到一個不錯的加速,您的結果將取決於你有多少個內核具有等

TestOriginalVersion 
441579/400000000 (0.110395%) passed filter 
2447300015 
Elapsed time = 12525ms 
TestArrayParallelVersion 
441579/400000000 (0.110395%) passed filter 
2447300015 
Elapsed time = 657ms 
1

你已經得到了最有效的訪問模式爲v1 ,但對於外部循環的每次迭代,您將順序掃描整個v2。這是非常低效的,因爲v2訪問將不斷導致(L2和可能還有L3)緩存未命中。

更好的訪問模式,以增加循環嵌套,從而使外環步幅通過v1v2,並且兩個v1v2這是足夠小以適合於高速緩存的子段內的內部循環的過程元素。

基本上,而不是

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000 
    for(size_t i2 = 0; i2 < v2.size(); ++i2) { 

for(size_t i2a = 0; i2a < v2.size(); i2a += 32) { 
    for(size_t i1 = 0; i1 < v1.size(); ++i1) { 
     for(size_t i2 = i2a; i2 < v2.size() && i2 < i2a + 32; ++i2) { 

或者

size_t i2a = 0; 

// handle complete blocks 
for(; i2a < v2.size() - 31; i2a += 32) { 
    for(size_t i1 = 0; i1 < v1.size(); ++i1) { 
     for(size_t i2 = i2a; i2 < i2a + 32; ++i2) { 

     } 
    } 
} 

// handle leftover partial block 
for(size_t i1 = 0; i1 < v1.size(); ++i1) { 
    for(size_t i2 = i2a; i2 < v2.size(); ++i2) { 
    } 
} 

這樣,32 * 128 * sizeof (uint16_t)字節,或8KB的塊,將v2被加載到緩存,然後重複使用20,000次。

該改進與SIMD(SSE)向量化正交。它將與基於線程的並行機制相互作用,但可能是一種很好的方式。