2008-09-18 101 views
3

我需要一個非常快速的算法來完成以下任務。我已經實現了幾種完成它的算法,但它們對於我所需要的性能來說太慢了。它應該足夠快,可以在現代CPU上運行至少10萬次。它將用C++實現。幫助合併矢量的算法

我正在使用跨度/範圍,一個結構有一個開始和一個結束座標。

我有兩個跨度的向量(動態數組),我需要合併它們。一個矢量是src,另一個是dst。矢量按跨度起點座標排序,跨度在一個矢量內不重疊。

src向量中的跨度必須與dst向量中的跨度合併,使得生成的向量仍然排序並且沒有重疊。 IE瀏覽器。如果在合併期間檢測到重疊,則將兩個跨度合併爲一個。 (合併兩個跨度只是改變結構中的座標的問題。)

現在,還有一個catch,src向量中的跨度必須在合併期間「擴大」。這意味着一個常量將被添加到開始,另一個(更大的)常量將被添加到src中每個跨度的結束座標。這意味着src跨度擴大後可能會重疊。


我到目前爲止所做的是,它不能完全就地完成,需要某種臨時存儲。我認爲在src和dst總結的元素數量的線性時間內應該是可行的。

任何臨時存儲都可能在算法的多次運行之間共享。

我曾嘗試這兩種主要方法,其是太慢,有:

  1. 追加的src到dst的所有元素,它追加之前加寬每個元素。然後運行就地排序。最後,使用「read」和「write」指針遍歷結果向量,讀指針在寫指針前面運行,並在跨越時合併跨度。當所有元素都被合併(讀指針到達結尾)時,dst被截斷。

  2. 創建臨時工作向量。通過反覆從src或dst中選擇下一個元素併合併到工作向量中,如上所述進行天真合併。完成後,將工作向量複製到dst,替換它。

第一種方法具有排序爲O((M + N)*日誌(M + N)),而不是O(M + N),並具有一定程度的開銷的問題。這也意味着dst矢量必須增長得比實際需要的大得多。

第二個問題是大量複製和重新分配/釋放內存的主要問題。

如果您認爲需要,可以更改用於存儲/管理跨度/矢量的數據結構。

更新:忘了說數據集有多大。最常見的情況是在任何一個向量中有4到30個元素,並且dst是空的,或者src和dst中跨度之間有大量重疊。

回答

0

第二種方法如何不重複分配 - 換句話說,分配您的臨時向量一次,而不再分配它?或者,如果輸入向量足夠小(但不是常量大小),只需使用alloca而不是malloc。

此外,在速度方面,你可能想確保你的代碼是使用CMOV的排序,因爲如果代碼實際上分支的歸併的每一個迭代:

if(src1[x] < src2[x]) 
    dst[x] = src1[x]; 
else 
    dst[x] = src2[x]; 

的分支預測將在50%的時間內失敗,這將對性能產生巨大影響。有條件的移動可能會好得多,所以確保編譯器正在這樣做,如果沒有,嘗試哄騙它這樣做。

0

您在方法1中提到的排序可以縮減爲線性時間(從描述它的對數線性算起),因爲兩個輸入列表已經排序。只需執行合併排序的合併步驟。用適當的輸入範圍向量表示(例如單向鏈表),可以在原地完成。

http://en.wikipedia.org/wiki/Merge_sort

0

我不認爲嚴格的線性解決方案是可行的,因爲擴大SRC載體跨度可能在最壞的情況下導致他們都重疊(取決於常數的大小,你是增加)

這個問題可能在執行中,不在算法中;我建議剖析代碼爲您的解決方案之前,看看時間都花在

推理:

對於像英特爾的Core 2 Extreme QX9770真正的「現代」的CPU爲3.2GHz運行,可以預計約59,455 MIPS

對於100,000個矢量,您必須在594,550個指令中處理每個矢量。這是很多指示。

裁判:wikipedia MIPS

除了

,注意添加常數到src矢量跨度不會去對它們進行排序,這樣你就可以恢復正常的SRC矢量獨立跨越,再與DST矢量跨越合併它們;這應該可以減少原始算法的工作量

+0

100k實際上可能是一個下衝,我還沒有真正計算的數字。當我說「現代」的CPU時,我實際上在想「5年前的東西」,Athlon XP 3000+並不是不切實際的目標。 – jfs 2008-09-18 02:38:24

8

我們知道絕對最佳運行時運行時間是O(m + n),這是因爲您至少必須掃描所有數據以便能夠合併列表。鑑於此,你的第二種方法應該給你這種類型的行爲。

你有沒有介紹你的第二種方法來找出瓶頸?根據您所談論的數據量,很可能在指定的時間內完成您想要的操作實際上是不可能的。驗證這一點的一種方法是做一些簡單的事情,比如總結循環中每個向量中跨度的所有開始和結束值以及時間。基本上,這裏你正在爲向量中的每個元素做最少量的工作。這將爲您提供一個可獲得最佳性能的基準。

除此之外,您可以避免通過使用stl swap方法將元素按元素複製,並且可以將temp矢量預先分配給一定的大小,以避免在合併元素時觸發數組的擴展。

你可以考慮在你的系統中使用2個載體,每當你需要做的,你合併到使用的向量合併,然後交換(這類似於在圖形中使用雙緩衝)。這樣,每次進行合併時都不必重新分配向量。

但是,您最好先分析並找出自己的瓶頸。如果分配與實際合併過程相比是最小的,則需要找出如何更快地完成分配。

一些可能的附加加速可能來自直接訪問向量原始數據,這可以避免每次訪問數據時的邊界檢查。

+0

感謝提醒我std :: swap,它可能實際上是交易斷路器。我會回來後測試了這一點;) – jfs 2008-09-18 02:32:47

0

1是對的 - 完整排序比合並兩個排序列表慢。

所以你看着調整2(或全新的東西)。

如果將數據結構更改爲雙向鏈接列表,則可以將它們合併到恆定的工作空間中。

對列表節點使用固定大小的堆分配器,以減少每個節點的內存使用量並提高節點在內存中靠近在一起的機率,從而減少頁面丟失。

您可能能夠在線或在您最喜歡的算法書中查找代碼以優化鏈接列表合併。您需要對此進行自定義,以便在列表合併的同時進行跨度合併。要優化合並,首先要注意的是,對於每個運行的值來自同一側而沒有來自另一側的值,您可以一次將整個運行插入到dst列表中,而不是將每個節點插入轉。並且,您可以將每個插入的寫入操作保存在正常的列表操作中,只需將結尾保留爲「懸掛」即可,並知道您將在稍後對其進行修補。並且假設您不會在應用程序的其他任何位置執行刪除操作,則列表可以單鏈接,這意味着每個節點只寫一個。

至於10微秒運行時 - 那種取決於n和m ...

0

如果您最近實施仍是不夠快,你可能最終不得不看的替代方法。

你使用這個函數的輸出是什麼?

+0

那麼你錯過了一件事,不只是一個'三角洲',有兩個。跨度的左側和右側添加了不同的值,特別是右側的值比左側的值大。 – jfs 2008-09-18 03:06:30

0

我爲這個算法編寫了一個新的容器類,根據需要量身定製。這也給了我一個機會來調整我的程序周圍的其他代碼,同時有一點速度提升。

這比使用STL向量的舊實現速度快得多,但它基本上是相同的東西。但儘管速度更快,但速度還不夠快......不幸的是。

分析並不能揭示什麼是真正的瓶頸。該MSVC探查似乎有時放在了錯誤的所謂的「怪」(據說相同運行中廣泛分配不同的運行時間)和大多數呼叫越來越合併成一個大的縫隙。

縱觀生成的代碼的反彙編顯示,有一個非常大的量生成的代碼跳躍的,我認爲這可能是緩慢的主要原因吧。

class SpanBuffer { 
private: 
    int *data; 
    size_t allocated_size; 
    size_t count; 

    inline void EnsureSpace() 
    { 
     if (count == allocated_size) 
      Reserve(count*2); 
    } 

public: 
    struct Span { 
     int start, end; 
    }; 

public: 
    SpanBuffer() 
     : data(0) 
     , allocated_size(24) 
     , count(0) 
    { 
     data = new int[allocated_size]; 
    } 

    SpanBuffer(const SpanBuffer &src) 
     : data(0) 
     , allocated_size(src.allocated_size) 
     , count(src.count) 
    { 
     data = new int[allocated_size]; 
     memcpy(data, src.data, sizeof(int)*count); 
    } 

    ~SpanBuffer() 
    { 
     delete [] data; 
    } 

    inline void AddIntersection(int x) 
    { 
     EnsureSpace(); 
     data[count++] = x; 
    } 

    inline void AddSpan(int s, int e) 
    { 
     assert((count & 1) == 0); 
     assert(s >= 0); 
     assert(e >= 0); 
     EnsureSpace(); 
     data[count] = s; 
     data[count+1] = e; 
     count += 2; 
    } 

    inline void Clear() 
    { 
     count = 0; 
    } 

    inline size_t GetCount() const 
    { 
     return count; 
    } 

    inline int GetIntersection(size_t i) const 
    { 
     return data[i]; 
    } 

    inline const Span * GetSpanIteratorBegin() const 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<const Span *>(data); 
    } 

    inline Span * GetSpanIteratorBegin() 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<Span *>(data); 
    } 

    inline const Span * GetSpanIteratorEnd() const 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<const Span *>(data+count); 
    } 

    inline Span * GetSpanIteratorEnd() 
    { 
     assert((count & 1) == 0); 
     return reinterpret_cast<Span *>(data+count); 
    } 

    inline void MergeOrAddSpan(int s, int e) 
    { 
     assert((count & 1) == 0); 
     assert(s >= 0); 
     assert(e >= 0); 

     if (count == 0) 
     { 
      AddSpan(s, e); 
      return; 
     } 

     int *lastspan = data + count-2; 

     if (s > lastspan[1]) 
     { 
      AddSpan(s, e); 
     } 
     else 
     { 
      if (s < lastspan[0]) 
       lastspan[0] = s; 
      if (e > lastspan[1]) 
       lastspan[1] = e; 
     } 
    } 

    inline void Reserve(size_t minsize) 
    { 
     if (minsize <= allocated_size) 
      return; 

     int *newdata = new int[minsize]; 

     memcpy(newdata, data, sizeof(int)*count); 

     delete [] data; 
     data = newdata; 

     allocated_size = minsize; 
    } 

    inline void SortIntersections() 
    { 
     assert((count & 1) == 0); 
     std::sort(data, data+count, std::less<int>()); 
     assert((count & 1) == 0); 
    } 

    inline void Swap(SpanBuffer &other) 
    { 
     std::swap(data, other.data); 
     std::swap(allocated_size, other.allocated_size); 
     std::swap(count, other.count); 
    } 
}; 


struct ShapeWidener { 
    // How much to widen in the X direction 
    int widen_by; 
    // Half of width difference of src and dst (width of the border being produced) 
    int xofs; 

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call 
    SpanBuffer buffer; 

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst); 

    ShapeWidener(int _xofs) : xofs(_xofs) { } 
}; 


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst) 
{ 
    if (src.GetCount() == 0) return; 
    if (src.GetCount() + dst.GetCount() == 0) return; 

    assert((src.GetCount() & 1) == 0); 
    assert((dst.GetCount() & 1) == 0); 

    assert(buffer.GetCount() == 0); 

    dst.Swap(buffer); 

    const int widen_s = xofs - widen_by; 
    const int widen_e = xofs + widen_by; 

    size_t resta = src.GetCount()/2; 
    size_t restb = buffer.GetCount()/2; 
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin(); 
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin(); 

    while (resta > 0 || restb > 0) 
    { 
     if (restb == 0) 
     { 
      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); 
      --resta, ++spa; 
     } 
     else if (resta == 0) 
     { 
      dst.MergeOrAddSpan(spb->start, spb->end); 
      --restb, ++spb; 
     } 
     else if (spa->start < spb->start) 
     { 
      dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); 
      --resta, ++spa; 
     } 
     else 
     { 
      dst.MergeOrAddSpan(spb->start, spb->end); 
      --restb, ++spb; 
     } 
    } 

    buffer.Clear(); 
} 
+0

嘗試使用deque而不是vector。 Deque的內存分配較少,但不以連續的內存爲代價。 – moswald 2008-09-18 06:28:22

0

我會始終保持我的向量跨度排序。這使得實現算法更容易 - 並且可能在線性時間內完成。

  • 跨度最小遞增的順序
  • 然後跨越最大的遞減順序

您需要創建一個函數來做到:

OK,所以我跨度基於排序那。

然後,我會使用std :: set_union來合併向量(您可以在繼續之前合併多個向量)。

然後對於每個連續的套具有相同的最低跨度,你保持第一,並刪除其餘的(他們是第一跨度的子跨度)。

然後你需要合併你的跨度。在線性時間內,這應該是現在和可行的。

好的,現在就是這個把戲。不要試圖做到這一點。使用一個或多個臨時向量(並提前預留足夠的空間)。然後在最後,調用std :: vector :: swap將結果放入您選擇的輸入向量中。

我希望這足以讓你走。

0

你的目標系統是什麼?它是多核?如果是這樣,你可以考慮使用多線程算法

+0

我的目標系統是過去5年左右的桌面系統,我不能假設任何有關SMP支持或SIMD指令集的內容。 (我可以假設x86上有MMX,但就是這樣。) – jfs 2008-09-18 20:58:48