2016-12-14 170 views
1

我必須在C++中編寫多生產者 - 消費者系統,但是我試圖將模型的每個部分(具有正確緩衝區的線程)放在一起。模型的基本功能是:我有一個執行函數的初始線程。返回的結果需要放入未確定數量的緩衝區中,因爲函數處理的每個元素都不相同,並且需要在單個線程中處理。然後,通過存儲在緩衝區中的數據,另一個線程需要獲取這些緩衝區的數據以執行另一個功能,並且需要將這個返回再次放入一些緩衝區。C++中的無鎖多生產者多個消費者

在我有創造了這個緩衝結構的那一刻:

template <typename T> 
class buffer { 
public: 
    atomic_buffer(int n); 
    int bufSize() const noexcept; 
    bool bufEmpty() const noexcept; 
    bool full() const noexcept; 
    ~atomic_buffer() = default; 


    void put(const T & x, bool last) noexcept; 
    std::pair<bool,T> get() noexcept; 

private: 
    int next_pos(int p) const noexcept; 

private: 
    struct item { 
    bool last; 
    T value; 
    }; 
    const int size_; 
    std::unique_ptr<item[]> buf_; 
    alignas(64) std::atomic<int> nextRd_ {0}; 
    alignas(64) std::atomic<int> nextWrt_ {0}; 
}; 

我還創建了一個vector結構存儲集合聯合國緩衝區,以滿足線程需要的人數不詳。

std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1; 

for(int i=0; i<n; i++){  
v1.push_back(std::unique_ptr<locked_buffer<std::pair<int,std::vector<std::vector<unsigned char>>>>> (new locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>(aux))); 
} 

編輯:

Flowchart of the model

+0

我不明白你的問題。您可以添加數據流的圖紙嗎? – 1000ml

+1

@ 1000ml編輯。 – giorgioW

+0

不知道更多的上下文,這看起來像有一個生產者線程可以創建任務。這些任務是並行執行的。所有結果都由一個線程使用。在我看來,你甚至不需要在緩衝區上進行任何同步。 – 1000ml

回答

1

不知道更多情況下,這看起來像一個標準的線程池的應用程序。您有不同的任務進入同步隊列(例如您在那裏的buffer類)。線程池中的每個工作線程輪詢此隊列並每次處理一個任務(例如,通過執行run()方法)。他們將結果寫回到另一個同步隊列中。

每個工作線程都有自己的線程本地輸入和輸出緩衝區對。它們不需要同步,因爲它們只能在所有者線程本身內進行訪問。

enter image description here

編輯:其實,我認爲這是可以簡化很多:只需使用一個線程池和一個同步的隊列。工作線程可以將新任務直接排入隊列。繪圖中的每個線程都將對應一種類型的任務,並實現一個共同的接口。 你不需要多個緩衝區。你可以使用多態性並將所有內容放在一個緩衝區中。

編輯2 - 線程池說明:
線程池只是一個概念。忘記池化方面,使用固定數量的線程。主要思想是:不用具有特定功能的多個線程,而是有N個線程可以處理任何類型的任務。其中N是CPU的內核數量。

enter image description here

enter image description here

工作者線程確實像下面這樣你可以改變。請注意,這是簡化的,但你應該明白。

void Thread::run(buffer<Task*>& queue) { 
    while(true) { 
     Task* task = queue.get(); 
     if(task) 
      task->execute(); 
     while(queue.isEmpty()) 
      waitUntilQueueHasElement(); 
    } 
} 

而你的任務實現一個共同的接口,這樣就可以把Task*指針到一個單一的隊列:

struct Task { 
    virtual void execute() = 0; 
} 

struct Task1 : public Task { 
    virtual void execute() override { 
     A(); 
     B1(); 
     C(); 
    } 
} 

... 

另外,請你幫個忙,並使用typedef;)

`std::vector<std::unique_ptr<locked_buffer<std::pair<int, std::vector<std::vector<unsigned char>>>>>> v1;` 

變成

typedef std::vector<std::vector<unsigned char>> vector2D_uchar; 
typedef std::pair<int, vector2D_uchar> int_vec_pair; 
typedef std::unique_ptr<locked_buffer<int_vec_pair>> locked_buffer_ptr; 
std::vector<locked_buffer_ptr> v1; 
+0

你能給我一個線程池聲明的例子嗎?感謝您的明確解釋,但線程池的概念在我的腦海中仍然有點混亂。 – giorgioW

+1

@giorgioW編輯。但是,我不知道你的最終目標是什麼,所以拿一粒鹽來做這個。 – 1000ml

相關問題