2014-09-01 54 views
1
查找未知尺寸的區域的最大值在Array

說我有一個包含所有不同數目[45,21,764,234,7,0,12,55,...]使用CUDA

然後,我有另一個數組B[4000]與表示在陣列A區域的位置值的陣列A[4000]如果它是區域的一部分,則爲1,如果不是,則爲0。如果1's彼此相鄰,則表示它們是同一區域的一部分,如果它們彼此不相鄰(1's之間有0),則它們是不同區域的一部分。

ex。 B = [1,1,1,0,1,1,0,0...]就是說我想找到在first three numbers in array A的區域中的最大值,並且在5th and 6th numbers in array A, etc. 所以,我可以產生保持在每個由B表示的區域中的A最大值的數組C[4000]的最大數量,和一個0在不是地區的一部分的區域。

因此,在這種情況下C = [764,764,764,0,7,7,0,0...]

可以有從0 to 2,000 regions在任何地方,和區域的長度可以從2 to 4,000 numbers long範圍。我從來不知道有多少區域或區域的大小不同。

我一直在想出一個可以實現這個結果的CUDA內核。它需要儘可能快地完成,因爲它實際上將被用於圖像,這只是一個簡單的例子。我所有的想法,比如使用簡化,只在只有一個區域跨越所有4000數字A時才起作用。然而,我不認爲我可以在這裏使用縮減,因爲可能有多個區域由13996之間的空格(0's)分隔,陣列中的縮減會導致我分散區域的跟蹤。或者,內核有太多的循環,如果在它的語句要快如

int intR = 0; 
while(B[blockIdx.x * blockDim.x + threadIdx.x + intR] > 0){ 
    intMaxR = intMaxR < A[blockIdx.x * blockDim.x + threadIdx.x + intR] ? A[blockIdx.x * blockDim.x + threadIdx.x + intR] : intMaxR; 
    intR++; 
} 

int intL = 0; 
while(B[blockIdx.x * blockDim.x + threadIdx.x - intL] > 0){ 
    intMaxL = intMaxL < A[blockIdx.x * blockDim.x + threadIdx.x - intL] ? A[blockIdx.x * blockDim.x + threadIdx.x + intL] : intMaxL; 
    intL++; 
} 

intMax = intMaxR > intMaxL ? intMaxR : intMaxL; 

for(int i = 0; i < intR; i++){ 
    C[blockIdx.x * blockDim.x + threadIdx.x + i] = intMax; 
} 
for(int i = 0; i < intL; i++){ 
    C[blockIdx.x * blockDim.x + threadIdx.x - i] = intMax; 
} 

顯然代碼甚至共享內存慢,是不是真正得到了CUDA的並行性質的優勢。有沒有人有任何想法如何或如果這可以在CUDA中有效地完成?

在此先感謝。

+1

您可以使用[thrust](https://github.com/thrust/thrust/wiki/Quick-Start-Guide)函數[reduce_by_key](http://thrust.github.io/doc /group__reductions.html#ga1fd25c0e5e4cc0a6ab0dcb1f7f13a2ad)來幫助解決這個問題。 – 2014-09-01 12:07:25

+0

這將有助於找到區域的最大值,但是我是不是仍然會丟失我的位置,或者不得不基本遍歷整個陣列來填充區域不平行的區域 – user2719805 2014-09-01 13:27:24

+0

是的,我沒有暗示這是一個完整的解決方案。我已經添加了一個答案,說明如何在完整的解決方案中使用它。 – 2014-09-01 15:02:07

回答

3

一種可能的方法是使用thrust

一個可能的序列將是這樣的:

  1. 使用 thrust::reduce_by_key 以產生用於每個範圍的最大值。
  2. 使用thrust :: adjacent_difference描繪每個範圍的起始點
  3. 對步驟2的結果使用包含式掃描來生成聚集索引,即將用於選擇縮小值的索引(來自步驟1)將會出現在輸出向量的每個位置。
  4. 使用thrust::gather_if使用步驟3中生成的聚集索引,選擇性地將縮小的值放置到輸出向量中的適當位置(B向量中存在1)。

這裏的一個充分的工作代碼演示此,使用像您的示例A和B載體:

關於示例
#include <iostream> 
#include <thrust/device_vector.h> 
#include <thrust/adjacent_difference.h> 
#include <thrust/reduce.h> 
#include <thrust/copy.h> 
#include <thrust/transform_scan.h> 
#include <thrust/iterator/discard_iterator.h> 
#include <thrust/iterator/transform_iterator.h> 
#include <thrust/functional.h> 

#define DSIZE 8 

template <typename T> 
struct abs_val : public thrust::unary_function<T, T> 
{ 
    __host__ __device__ 
    T operator()(const T& x) const 
    { 
    if (x<0) return -x; 
    else return x; 
    } 
}; 

template <typename T> 
struct subtr : public thrust::unary_function<T, T> 
{ 
    const T val; 
    subtr(T _val): val(_val) {} 
    __host__ __device__ 
    T operator()(const T& x) const 
    { 
    return x-val; 
    } 
}; 

int main(){ 

    int A[DSIZE] = {45,21,764,234,7,0,12,55}; 
    int B[DSIZE] = {1,1,1,0,1,1,0,0}; 
    thrust::device_vector<int> dA(A, A+DSIZE); 
    thrust::device_vector<int> dB(B, B+DSIZE); 
    thrust::device_vector<int> dRed(DSIZE); 
    thrust::device_vector<int> diffB(DSIZE); 
    thrust::device_vector<int> dRes(DSIZE); 

    thrust::reduce_by_key(dB.begin(), dB.end(), dA.begin(), thrust::make_discard_iterator(), dRed.begin(), thrust::equal_to<int>(), thrust::maximum<int>()); 
    thrust::adjacent_difference(dB.begin(), dB.end(), diffB.begin()); 
    thrust::transform_inclusive_scan(diffB.begin(), diffB.end(), diffB.begin(), abs_val<int>(), thrust::plus<int>()); 
    thrust::gather_if(thrust::make_transform_iterator(diffB.begin(), subtr<int>(B[0])), thrust::make_transform_iterator(diffB.end(), subtr<int>(B[0])), dB.begin(), dRed.begin(), dRes.begin()); 
    thrust::copy(dRes.begin(), dRes.end(), std::ostream_iterator<int>(std::cout, " ")); 
    std::cout << std::endl; 
    return 0; 
} 

注:

  1. reduce_by_key正在產生降低的值(最大值),用於每個 連續的0序列 B中的1個序列。您只需真正需要 這1個序列的最大值。我們將通過gather_if函數丟棄最大值爲0的序列 。
  2. 我允許該B載體可以與任一 1個序列或0的序列開始,通過使用transform_iterator 處理步驟2的矢量結果,減去從每個B載體的第一 值的可能性收集索引。
  3. adjacent_difference操作將產生1或-1到 劃定新序列的開始。爲了掃描目的(即生成聚集索引),我使用帶有abs_val函子的變換包含變換函數variant_inclusive_scan來平等對待它們。
  4. 上面的代碼應該產生的結果符合期望C輸出向量,就像這樣:

    $ nvcc -arch=sm_20 -o t53 t53.cu 
    $ ./t53 
    764 764 764 0 7 7 0 0 
    $ 
    

我們可以使用thrust::placeholders進一步簡化上面的代碼,省去了多餘的仿函數定義的需要:

#include <iostream> 
#include <thrust/device_vector.h> 
#include <thrust/adjacent_difference.h> 
#include <thrust/reduce.h> 
#include <thrust/copy.h> 
#include <thrust/transform_scan.h> 
#include <thrust/iterator/discard_iterator.h> 
#include <thrust/iterator/transform_iterator.h> 
#include <thrust/functional.h> 

#define DSIZE 2000000 
using namespace thrust::placeholders; 

typedef int mytype; 

int main(){ 

    mytype *A = (mytype *)malloc(DSIZE*sizeof(mytype)); 
    int *B = (int *)malloc(DSIZE*sizeof(int)); 
    for (int i = 0; i < DSIZE; i++){ 
    A[i] = (rand()/(float)RAND_MAX)*10.0f; 
    B[i] = rand()%2;} 
    thrust::device_vector<mytype> dA(A, A+DSIZE); 
    thrust::device_vector<int> dB(B, B+DSIZE); 
    thrust::device_vector<mytype> dRed(DSIZE); 
    thrust::device_vector<int> diffB(DSIZE); 
    thrust::device_vector<mytype> dRes(DSIZE); 

    cudaEvent_t start, stop; 
    cudaEventCreate(&start); 
    cudaEventCreate(&stop); 
    cudaEventRecord(start); 
    thrust::reduce_by_key(dB.begin(), dB.end(), dA.begin(), thrust::make_discard_iterator(), dRed.begin(), thrust::equal_to<mytype>(), thrust::maximum<mytype>()); 
    thrust::adjacent_difference(dB.begin(), dB.end(), diffB.begin()); 
    thrust::transform_inclusive_scan(diffB.begin(), diffB.end(), diffB.begin(), _1*_1, thrust::plus<int>()); 
    thrust::gather_if(thrust::make_transform_iterator(diffB.begin(), _1 - B[0]), thrust::make_transform_iterator(diffB.end(), _1 - B[0]), dB.begin(), dRed.begin(), dRes.begin()); 
    cudaEventRecord(stop); 
    cudaEventSynchronize(stop); 
    float et; 
    cudaEventElapsedTime(&et, start, stop); 
    std::cout<< "elapsed time: " << et << "ms " << std::endl; 
    thrust::copy(dRes.begin(), dRes.begin()+10, std::ostream_iterator<mytype>(std::cout, " ")); 
    std::cout << std::endl; 
    return 0; 
} 

(I已經修改了上述佔位符代碼還包括生成一個較大尺寸的數據集的,以及一些基本定時APPA鼠標)

+0

這段代碼絕對有效,它是一個很好的答案。但是,我終於有機會測試此代碼的時間,並且實際上在圖像上使用速度太慢。有沒有可能的方法來加速? – user2719805 2014-09-02 18:32:21

+1

我調升'DSIZE'到2000000(即,對於1920×1080的圖像的代理),生成的任意的數據,以及包裹圍繞'4個鍵推力呼叫cudaEvent'定時(減少通過聚集)和定時爲〜上的2.5ms的K40,在C2075上約3ms,在Quadro NVS 310(cc2.1,1 SM,即小型GPU)上約爲20ms。 60fps的將需要小於16毫秒每幀進行處理,但不包括數據傳送時間或其他開銷(這大概可以被流水線化)。除了使用快速GPU之外,我沒有立即提出加快速度的建議。 – 2014-09-02 19:26:32

+0

我修改了我的答案中顯示的代碼的佔位符版本,以演示上面評論中討論的時間和分析。 – 2014-09-02 19:35:20