2017-05-24 127 views
1

我正在嘗試開發一種管道,其中首先讀取和處理數據,操作一次,以不同方式操作並顯示數據。我有一個設計,其中數據IO饋送到第一個操縱器讀取的緩衝區中。隨後,第一個操縱器寫入另一個緩衝區,該緩衝區在第二個操縱器可能的情況下被讀取。最後,將第二個操縱器的輸出寫入顯示緩衝區,然後由可視化工具讀取並使用OpenGL進行顯示。在流水線執行中採用並行處理

在我看來,這是一個相當直接的並行問題,其中每個任務都有自己的線程並通過數據緩衝區進行通信。然而,我在線程程序中遇到的所有教程似乎都表明,多線程是一些中間件(如OpenMP)決定如何劃分工作負載。

我是新開發多線程應用程序,所以這可能是一個愚蠢的問題,但是我所描述的是可行的,可以用OpenMP等中間件來完成嗎?我意識到顯而易見的答案是「嘗試它」,並且我想,但是這些教程沒有闡明如何嘗試它。

+0

什麼是「緩衝塊」的含義實驗?函數調用塊,而不是數據結構。 –

+0

@KerrekSB:你說得對。我的意思是讀取電話會阻塞,直到有緩衝區中的數據 – marcman

+0

我想你應該看看[消費者/製造商問題](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem ) – cbuchart

回答

1

OpenMP更適合易於跨多個內核(SIMD)的算法。其他情況是可能的,但在你的情況下,我認爲直接使用線程會更好,並且更容易編碼和維護。

我將我的答案分爲兩部分:一般解決方案沒有 OpenMP,以及一些使用OpenMP的特定更改。

正如在評論中提到的,你面對生產者/消費者問題,但是兩次:一個線程正在填充一個緩衝區(產生一個項目),然後必須通過第二個線程讀取(並修改) )。你的問題的特殊性在於,這第二個線程也是一個生產者(要繪製的圖像),第三個線程是負責使用它的人(可視化器)。如你所知,P/C問題是通過使用一個緩衝區(可能是一個循環緩衝區或一個生產項目隊列)來解決的,其中緩衝區的每個元素都被標記爲生成或消耗,並且線程具有獨佔性訪問時添加或從中獲取項目。


讓我們在下面的示例程序中使用隊列方法處理您的問題。

  • 生產物品將存儲在隊列中。隊列的前面包含最早的元素,這些元素必須先被消費。
  • 有兩個隊列:一個用於第一個操縱器生成的數據(並被第二個操縱器使用),另一個用於第二個操縱器生成的數據(並且將由另一個線程顯示)。
  • 生產階段很簡單:獲得對相應隊列的獨佔訪問權,並在最後插入元素。
  • 消費類似,但必須等待隊列至少有一個元素(非空)。
  • 我添加了一些來模擬其他操作。
  • 停止條件僅用於說明目的。

注:我假設你有機會獲得一個C++編譯器11爲簡單起見。使用其他API的實現相對相似。

#include <iostream> 
#include <thread> 
#include <mutex> 
#include <atomic> 
#include <chrono> 
#include <list> 

using namespace std::chrono_literals; 

std::mutex g_data_produced_by_m1_mutex; 
std::list<int> g_data_produced_by_m1; 

std::mutex g_data_produced_by_m2_mutex; 
std::list<int> g_data_produced_by_m2; 

std::atomic<bool> stop = false; 

void manipulator1_kernel() 
{ 
    while (!stop) { 
    // Producer 1: generate data 
    { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex); 
     g_data_produced_by_m1.push_back(rand()); 
    } 
    std::this_thread::sleep_for(100ms); 
    } 
} 

void manipulator2_kernel() 
{ 
    int data; 

    while (!stop) { 
    // Consumer 1 
    while (!stop) { // wait until there is an item to be consumed 
     { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m1_mutex); 
     if (!g_data_produced_by_m1.empty()) { // is there data to be consumed? 
      data = g_data_produced_by_m1.front(); // consume 
      g_data_produced_by_m1.pop_front(); 
      break; 
     } 
     } 
     std::this_thread::sleep_for(100ms); 
    } 

    // Producer 2: modify and send to the visualizer 
    { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex); 
     g_data_produced_by_m2.push_back(5 * data); 
    } 

    std::this_thread::sleep_for(100ms); 
    } 
} 

void visualizer_kernel() 
{ 
    int data; 

    while (!stop) { 
    // Consumer 2 
    while (!stop) { // wait until there is an item to be visualized 
     { 
     std::lock_guard<std::mutex> lock(g_data_produced_by_m2_mutex); 
     if (!g_data_produced_by_m2.empty()) { 
      data = g_data_produced_by_m2.front(); 
      g_data_produced_by_m2.pop_front(); 
      break; 
     } 
     } 
     std::this_thread::sleep_for(100ms); 
    } 

    std::cout << data << std::endl; // render to display 
    std::this_thread::sleep_for(100ms); 

    if (data % 8 == 0) stop = true; // some stop condition for the example 
    } 
} 

int main() 
{ 
    std::thread manipulator1(manipulator1_kernel); 
    std::thread manipulator2(manipulator2_kernel); 
    std::thread visualizer(visualizer_kernel); 

    visualizer.join(); 
    manipulator2.join(); 
    manipulator1.join(); 

    return 0; 
} 

如果你仍然想使用OpenMP的,也許你可以找到最接近的是tasks(因爲OpenMP的3.0,我認爲)。我沒有用他們很擔心,但是上面的程序可以寫成這樣:

int main() 
{ 
    #pragma omp parallel 
    { 
    #pragma omp task 
    manipulator1_kernel(); 
    #pragma omp task 
    manipulator2_kernel(); 
    #pragma omp task 
    visualizer_kernel(); 

    #pragma omp taskwait 
    }  

    return 0; 
} 

的代碼的其餘部分可改爲使用OpenMP的功能太多,但我覺得這回答了你的問題。

這種方法的主要問題是,您必須爲OpenMP parallel中的任務創建一個代碼塊,從而使您的應用程序邏輯和結構的其餘部分變得複雜。

1

要解決此特定問題英特爾®線程構建模塊庫包含特殊結構。 Intel® TBB是跨平臺的庫,它有助於多線程編程。 我們可以在四個不同的任務提供者處查看應用程序中涉及的實體。一種類型的任務是輸入任務 - 那些提供輸入數據的任務,另一種類型的任務由第一個操作例程提供,等等。

因此,用戶需要做的唯一事情就是爲這些任務提供正文。庫中有幾個API用於指定要處理的物體以及如何並行處理。其他一切(這裏我指的是線程創建,任務執行之間的同步,工作平衡等)由庫完成。

我想到的最簡單的解決方案是使用parallel_pipeline函數。這裏是原型:

#include "tbb/pipeline.h" 
using namespace tbb; 

int main() { 
    parallel_pipeline(/*specify max number of bodies executed in parallel, e.g.*/16, 
     make_filter<void, input_data_type>(
      filter::serial_in_order, // read data sequentially 
      [](flow_control& fc) -> input_data_type { 
       if (/*check some stop condition: EOF, etc.*/) { 
        fc.stop(); 
        return input_data_type(); // return dummy value 
       } 
       auto input_data = read_data(); 
       return input_data; 
      } 
     ) & 
     make_filter<input_data_type, manipulator1_output_type>(
      filter::parallel, // process data in parallel by the first manipulator 
      [](input_data_type elem) -> manipulator1_output_type { 
       auto processed_elem = manipulator1::process(elem); 
       return processed_elem; 
      } 
     ) & 
     make_filter<manipulator1_output_type, manipulator2_output_type>(
      filter::parallel, // process data in parallel by the second manipulator 
      [](manipulator1_output_type elem) -> manipulator2_output_type { 
       auto processed_elem = manipulator2::process(elem); 
       return processed_elem; 
      } 
     ) & 
     make_filter<manipulator2_output_type, void>(
      filter::serial_in_order, // visualize frame by frame 
      [](manipulator2_output_type elem) { 
       visualize(elem); 
      } 
     ) 
    ); 
    return 0; 
} 

前提是必需的功能(的read_data,可視化)被實現。這裏input_data_type,manipulator1_output_type等是在流水線階段之間傳遞的類型,並且操縱器的process函數對傳遞的參數進行必要的計算。

順便說一句,爲避免使用鎖和其他同步原語工作,可以使用庫中的concurrent_bounded_queue,並將輸入數據放入此隊列中,可能由不同的線程(例如專用於IO操作)組成,如concurrent_bounded_queue_instance.push(elem),以及然後通過input_data_type elem; concurrent_bounded_queue_instance.pop(elem)閱讀。請注意,在這裏彈出一個項目是一個阻塞操作。 concurrent_queue提供了非阻塞try_pop的替代方案。

另一種可能性是使用tbb::flow_graph及其節點來組織相同的流水線方案。看看描述dependencydata流程圖的兩個示例。您可能需要使用sequencer_node來正確排序項目執行(如有必要)。

這是值得閱讀標記標記SO看看其他人如何使用這個庫。

0

您是否實現了單線程版本?異形?

他們是關鍵步驟,W/O他們,你可以讓你的高度並行設計的最佳實現只是認識到瓶頸是你的緩衝器和/或線程同步和/或假共享和I/O /或緩存未命中或類似問題。

我想先嚐試一個簡單的線程池,其任務順序執行所有步驟。然後分析它的工作原理後,什麼是CPU消耗等我會用更復雜的工具總是比較它們的表現與第一簡版