2016-07-27 22 views
0

以下是我的大學不再教授課程的舊練習(並行處理)。目標是創建和使用存儲庫來加速無鎖分類矢量的實現。我自己實現了內存庫,目標是留出足夠的內存來使用,所以我不必在LFSV中使用新的或刪除的內存。我相信我需要一個Get()函數來返回內存地址(不知道如何跟蹤未使用的內存),Store應該釋放內存(不知何故將它標記爲未使用)。如何爲多線程向量(LFSV)創建適當的內存池?

在LFSV內部(在我的介入之前工作得非常好),練習解釋了我應該用新替換和存儲(我們想要釋放的內存)替換新的和刪除。我如何去創建Get(如果這是不正確的)或Store功能來執行像一個適當的內存銀行?我還會在網上查閱任何參考或存儲庫示例,因爲我無法找到與內存庫和多線程相關的良好資源。

該程序沒有錯誤,但由於沒有正確管理內存條,所以它返回爲「失敗」。

#include <algorithm>//copy, random_shuffle 
#include <ctime> //std::time (NULL) to seed srand 
#include <iostream>  // std::cout 
#include <atomic>   // std::atomic 
#include <thread>   // std::thread 
#include <vector>   // std::vector 
#include <mutex>   // std::mutex 
#include <deque>   // std::deque 

class MemoryBank 
{ 
    std::deque< std::vector<int>* > slots; 
public: 
    MemoryBank() : slots(10000) 
    { 
     for (int i = 0; i<10000; ++i) 
     { 
      slots[i] = reinterpret_cast<std::vector<int>*>(new char[sizeof(std::vector<int>)]); 
     } 
    } 
    ~MemoryBank() 
    { 
     for (unsigned int i = 0; i < slots.size(); ++i) 
     { 
      delete slots[i]; 
     } 
     slots.clear(); 
    } 
    void * Get() 
    { 
     return &slots; 
    } 
    void Store(std::vector<int *> freeMemory) 
    { 
     return; 
    } 
}; 

class LFSV { 
    std::atomic< std::vector<int>* > pdata; 
    std::mutex wr_mutex; 
    MemoryBank mb; 

    public: 

    LFSV() : mb(), pdata(new (mb.Get()) std::vector<int>) {} 

    ~LFSV() 
    { 
     mb.~MemoryBank(); 
    } 

    void Insert(int const & v) { 
     std::vector<int> *pdata_new = nullptr, *pdata_old; 
     int attempt = 0; 
     do { 
      ++attempt; 
      delete pdata_new; 
      pdata_old = pdata; 
      pdata_new = new (mb.Get())std::vector<int>(*pdata_old); 

      std::vector<int>::iterator b = pdata_new->begin(); 
      std::vector<int>::iterator e = pdata_new->end(); 
      if (b==e || v>=pdata_new->back()) { pdata_new->push_back(v); } //first in empty or last element 
      else { 
       for (; b!=e; ++b) { 
        if (*b >= v) { 
         pdata_new->insert(b, v); 
         break; 
        } 
       } 
      } 
//   std::lock_guard<std::mutex> write_lock(wr_mutex); 
//   std::cout << "insert " << v << "(attempt " << attempt << ")" << std::endl; 
     } while (!(this->pdata).compare_exchange_weak(pdata_old, pdata_new )); 
     // LEAKing pdata_old since "delete pdata_old;" will cause errors 


//  std::lock_guard<std::mutex> write_lock(wr_mutex); 
//  std::vector<int> * pdata_current = pdata; 
//  std::vector<int>::iterator b = pdata_current->begin(); 
//  std::vector<int>::iterator e = pdata_current->end(); 
//  for (; b!=e; ++b) { 
//   std::cout << *b << ' '; 
//  } 
//  std::cout << "Size " << pdata_current->size() << " after inserting " << v << std::endl; 
    } 

    int const& operator[] (int pos) const { 
     return (*pdata)[ pos ]; 
    } 
}; 

LFSV lfsv; 

void insert_range(int b, int e) { 
    int * range = new int [e-b]; 
    for (int i=b; i<e; ++i) { 
     range[i-b] = i; 
    } 
    std::srand(static_cast<unsigned int>(std::time (NULL))); 
    std::random_shuffle(range, range+e-b); 
    for (int i=0; i<e-b; ++i) { 
     lfsv.Insert(range[i]); 
    } 
    delete [] range; 
} 

int reader(int pos, int how_many_times) { 
    int j = 0; 
    for (int i=1; i<how_many_times; ++i) { 
     j = lfsv[pos]; 
    } 
    return j; 
} 

std::atomic<bool> doread(true); 

void read_position_0() { 
    int c = 0; 
    while (doread.load()) { 
     std::this_thread::sleep_for(std::chrono::milliseconds(10)); 
     if (lfsv[0] != -1) { 
      std::cout << "not -1 on iteration " << c << "\n"; // see main - all element are non-negative, so index 0 should always be -1 
     } 
     ++c; 
    } 
} 

void test(int num_threads, int num_per_thread) 
{ 
    std::vector<std::thread> threads; 
    lfsv.Insert(-1); 
    std::thread reader = std::thread(read_position_0); 

    for (int i=0; i<num_threads; ++i) { 
     threads.push_back(std::thread(insert_range, i*num_per_thread, (i+1)*num_per_thread)); 
    } 
    for (auto& th : threads) th.join(); 

    doread.store(false); 
    reader.join(); 

    for (int i=0; i<num_threads*num_per_thread; ++i) { 
     //  std::cout << lfsv[i] << ' '; 
     if (lfsv[i] != i-1) { 
      std::cout << "Error\n"; 
      return; 
     } 
    } 
    std::cout << "All good\n"; 
} 

void test0() { test(1, 100); } 
void test1() { test(2, 100); } 
void test2() { test(8, 100); } 
void test3() { test(100, 100); } 

void (*pTests[])() = { 
    test0,test1,test2,test3//,test4,test5,test6,test7 
}; 


#include <cstdio> /* sscanf */ 
int main(int argc, char ** argv) { 
    if (argc==2) { //use test[ argv[1] ] 
     int test = 0; 
     std::sscanf(argv[1],"%i",&test); 
     try { 
      pTests[test](); 
     } catch(const char* msg) { 
      std::cerr << msg << std::endl; 
     } 
     return 0; 
    } 
} 
+0

這些更通常稱爲內存池。 – immibis

回答

0

reinterpret_cast真的是「我知道我在做什麼,相信我」劇組。編譯器會 - 如果可能的話 - 相信你。

但是,在這種情況下,這是完全錯誤的。 new char[]確實不是返回一個vector<int>*

+0

什麼是「reinterpret_cast?」的正確語法? –

+0

@ArrotheStudent:根據語法使用的_syntax_是正確的。但是,這背後的邏輯是有缺陷的。我看不到一個簡單的解決方法。這些插槽應該包含哪些內容?我看到4種方法和4種不同類型('new char [],矢量,矢量,void *')。 – MSalters

+0

我相信Get()函數用於爲LFSV類中的新放置位置使用內存。我相信新的佈局需要一個空白*。 LFSV是由存儲庫管理的矢量。 The line: slots [i] = reinterpret_cast *>(new char [sizeof(std :: vector )]); 在練習開始的記憶庫框架中,所以我不確定這是對還是錯,因爲我不熟悉reinterpret_cast。 –