2017-02-18 532 views
0

我正在研究個人愛好時間遊戲引擎,並且正在開發多線程批處理執行程序。我最初在各地使用併發無鎖隊列和std :: function來促進主線程和從屬線程之間的通信,但決定放棄它,以支持輕量級的處理方式,從而可以嚴格控制內存分配:函數指針和內存池。多線程C++:強制從內存中讀取,繞過緩存

無論如何,我碰到的一個問題:

函數指針,不管我怎麼努力,只得到正確地由一個線程讀取,而別人看了一個空指針,因此無法斷言。

我很確定這是一個緩存問題。我已經確認所有線程都具有相同的指針地址。我試着將它聲明爲volatile,intptr_t,std :: atomic,並且嘗試了各種各樣的cast-fu,並且線程似乎都忽略它並繼續閱讀它們的緩存副本。

我仿照型號覈對主機和從機,以確保併發性好,而且沒有活鎖或死鎖(前提是共享變量都正確同步)

void Executor::operator() (int me) { 
    while (true) { 
     printf("Slave %d waiting.\n", me); 
     { 
      std::unique_lock<std::mutex> lock(batch.ready_m); 
      while(!batch.running) batch.ready.wait(lock); 
      running_threads++; 
     } 
     printf("Slave %d running.\n", me); 
     BatchFunc func = batch.func; 
     assert(func != nullptr); 

     int index; 
     if (batch.store_values) { 
      while ((index = batch.item.fetch_add(1)) < batch.n_items) { 
       void* data = reinterpret_cast<void*>(batch.data_buffer + index * batch.item_size); 
       func(batch.share_data, data); 
      } 
     } 
     else { 
      while ((index = batch.item.fetch_add(1)) < batch.n_items) { 
       void** data = reinterpret_cast<void**>(batch.data_buffer + index * batch.item_size); 
       func(batch.share_data, *data); 
      } 
     } 

     // at least one thread finished, so make sure we won't loop back around 
     batch.running = false; 

     if (running_threads.fetch_sub(1) == 1) { // I am the last one 
      batch.done = true; // therefore all threads are done 
      batch.complete.notify_all(); 
     } 
    } 
} 

void Executor::run_batch() { 
    assert(!batch.running); 
    if (batch.func == nullptr || batch.n_items == 0) return; 

    batch.item.store(0); 

    batch.running = true; 
    batch.done = false; 
    batch.ready.notify_all(); 

    printf("Master waiting.\n"); 
    { 
     std::unique_lock<std::mutex> lock(batch.complete_m); 
     while (!batch.done) batch.complete.wait(lock); 
    } 
    printf("Master ready.\n"); 

    batch.func = nullptr; 
    batch.n_items = 0; 
} 

batch.func是另一個功能

template<typename SharedT, typename ItemT> 
void set_batch_job(void(*func)(const SharedT*, ItemT*), const SharedT& share_data, bool byValue = true) { 
    static_assert(sizeof(SharedT) <= SHARED_DATA_MAXSIZE, "Shared data too large"); 
    static_assert(std::is_pod<SharedT>::value, "Shared data type must be POD"); 
    assert(std::is_pod<ItemT>::value || !byValue); 
    assert(!batch.running); 
    batch.func = reinterpret_cast<volatile BatchFunc>(func); 
    memcpy(batch.share_data, (void*) &share_data, sizeof(SharedT)); 
    batch.store_values = byValue; 
    if (byValue) { 
     batch.item_size = sizeof(ItemT); 
    } 
    else { // store pointers instead of values 
     batch.item_size = sizeof(ItemT*); 
    } 
    batch.n_items = 0; 
} 

被設置在這裏是結構(和類型定義),它在處理與

typedef void(*BatchFunc)(const void*, void*); 
struct JobBatch { 
    volatile BatchFunc func; 
    void* const share_data = operator new(SHARED_DATA_MAXSIZE); 

    intptr_t const data_buffer = reinterpret_cast<intptr_t>(operator new (EXEC_DATA_BUFFER_SIZE)); 
    volatile size_t item_size; 
    std::atomic<int> item; // Index into the data array 
    volatile int n_items = 0; 

    std::condition_variable complete; // slave -> master signal 
    std::condition_variable ready; // master -> slave signal 
    std::mutex complete_m; 
    std::mutex ready_m; 

    bool store_values = false; 

    volatile bool running = false; // there is work to do in the batch 
    volatile bool done = false; // there is no work left to do 

    JobBatch(); 
} batch; 

如何確保所有必需的讀取和寫入batch.func在線程之間得到正確同步?

以防萬一它很重要:我使用Visual Studio並編譯x64調試Windows可執行文件。英特爾i5,Windows 10,8GB內存。

+0

爲什麼你在結構中有多個不穩定的東西? – tambre

+0

可能是一個好主意,提供[MVCE](http:// stackoverflow。COM /幫助/ MCVE)。 – tambre

+0

volatile的過度是我擔心除了函數指針之外的_other_變量導致緩存爭用過度反應的事情不工作,因爲我期望他們。 – Beefster

回答

0

因此,我在C++內存模型上做了一些閱讀,並設法使用atomic_thread_fence將解決方案拼湊在一起。一切都可能超級破碎,因爲我瘋了,不應該在這裏推出我自己的系統,但嘿,學習很有趣!

基本上,只要你寫完,你希望其他線程看到的東西,你需要調用atomic_thread_fence(std::memory_order_release)

在接收線程(S),你讀共享數據之前調用atomic_thread_fence(std::memory_order_acquire)

就我而言,釋放應該在等待條件變量之前立即完成,並且在使用由其他線程寫入的數據之前立即執行獲取。

這可確保其他人看到一個線程上的寫入。

我不是專家,所以這可能不是解決問題的正確方法,並可能面臨一定的厄運。例如,我仍然有一個死鎖/活鎖問題需要解決。 dr:它不完全是一個緩存事物:線程可能沒有完全彼此同步的數據,除非您使用原子內存隔離強制執行該操作。

+0

互斥體,信號量,其他鎖爲您執行內存屏蔽。如果此類鎖已正確使用,則不需要額外的內存防護。 – bazza