2016-08-19 381 views
2

根據該文檔時,cufftSetStream()功能cufftSetStream導致垃圾輸出。難道我做錯了什麼?

相關聯的CUDA流與CUFFT計劃。在計劃執行期間所做的所有內核啓動現在都通過關聯的流[...直到...]完成,流將通過另一次調用cufftSetStream()進行更改。

不幸的是,結果變成垃圾。下面是一個例子,它通過兩種方式執行一系列轉換來演示:一種是每個流有自己的專用計劃,另一種是單個計劃被重用,如上面的文檔所示。前者的行爲如預期,重用/ cufftSetStream方法在大多數轉換中都存在錯誤。我在CentOS 7 linux上試用過的兩張顯卡(GTX 750 ti,Titan X)上觀察到了這種情況,其中有 Cuda編譯工具,7.0版,V7.0.27;併發布7.5,V7.5.17。

編輯:請參閱下面的「FIX」註釋以解決某些問題。

#include <cufft.h> 
#include <stdexcept> 
#include <iostream> 
#include <numeric> 
#include <vector> 

#define ck(cmd) if (cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);} 


__global__ 
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed) 
{ 
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) 
     for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x) 
      buf[i*stride + j] = make_cuFloatComplex((i+seed)%101 - 50,(j+seed)%41-20); 
} 

__global__ 
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors) 
{ 
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) { 
     for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) { 
      float e=buf1[i*stride+j] - buf2[i*stride+j]; 
      if (e*e > 1) // gross error 
       atomicAdd(errors,1); 
     } 
    } 
} 

void demo(bool reuse_plan) 
{ 
    if (reuse_plan) 
     std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... "; 
    else 
     std::cout << "Giving each stream its own dedicated fft plan ... "; 
    int nfft = 1024; 
    int batch = 1024; 
    int nstreams = 8; 
    int nbins = nfft/2+1; 
    int nit=100; 
    size_t inpitch,outpitch; 

    std::vector<cufftComplex*> inbufs(nstreams); 
    std::vector<float*> outbufs(nstreams); 
    std::vector<float*> checkbufs(nstreams); 
    std::vector<cudaStream_t> streams(nstreams); 
    std::vector<cufftHandle> plans(nstreams); 
    for (int i=0;i<nstreams;++i) { 
     ck(cudaStreamCreate(&streams[i])); 
     ck(cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch)); 
     ck(cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch)); 
     ck(cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch)); 
     if (i==0 || reuse_plan==false) 
      ck (cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch)); 
    } 

    // fill the input buffers and FFT them to get a baseline for comparison 
    for (int i=0;i<nstreams;++i) { 
     fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i); 
     ck (cudaGetLastError()); 
     if (reuse_plan) { 
      ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i])); 
     }else{ 
      ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i])); 
      ck(cufftSetStream(plans[i],streams[i])); // only need to set the stream once 
     } 
     ck(cudaDeviceSynchronize()); 
    } 
    // allocate a buffer for the error count 
    int * errors; 
    cudaMallocHost((void**)&errors,sizeof(int)*nit); 
    memset(errors,0,sizeof(int)*nit); 

    /* FIX: an event can protect the plan internal buffers 
    by serializing access to the plan 
    cudaEvent_t ev; 
    cudaEventCreateWithFlags(&ev,cudaEventDisableTiming); 
    */ 

    // perform the FFTs and check the outputs on streams 
    for (int it=0;it<nit;++it) { 
     int k = it % nstreams; 
     ck(cudaStreamSynchronize(streams[k])); // make sure any prior kernels have completed 
     if (reuse_plan) { 
      // FIX: ck(cudaStreamWaitEvent(streams[k],ev,0)); 
      ck(cufftSetStream(plans[0],streams[k])); 
      ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k])); 
      // FIX: ck(cudaEventRecord(ev,streams[k])); 
     }else{ 
      ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k])); 
     } 
     check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]); 
     ck (cudaGetLastError()); 
    } 
    ck(cudaDeviceSynchronize()); 

    // report number of errors 
    int errcount=0; 
    for (int it=0;it<nit;++it) 
     if (errors[it]) 
      ++errcount; 
    std::cout << errcount << " of " << nit << " transforms had errors\n"; 

    for (int i=0;i<nstreams;++i) { 
     cudaFree(inbufs[i]); 
     cudaFree(outbufs[i]); 
     cudaStreamDestroy(streams[i]); 
     if (i==0 || reuse_plan==false) 
      cufftDestroy(plans[i]); 
    } 
} 

int main(int argc,char ** argv) 
{ 
    demo(false); 
    demo(true); 
    return 0; 
} 

典型輸出

給予每個流其自己的專用FFT計劃... 0 100的變換有錯誤
重用經由cufftSetStream多流相同的FFT計劃... 87 100轉換有錯誤

+0

當我編譯並運行你張貼在一個很不起眼的移動GPU使用CUDA 7.0的代碼,我得到了這兩種情況下0錯誤。 – talonmies

+1

@talonmies,感謝您的數據點。我只用cuda 7.0試過 - 也失敗了(見編輯)也許卡的「謙虛」阻止它失敗(即更少的資源==更少的競爭條件)。你在什麼操作系統上? –

+0

帶有計算2.1設備的Windows 10 – talonmies

回答

3

爲了按照您希望的方式重新使用計劃,您需要手動管理cuFFT工作區。

每個計劃都有中間計算結果的空間。如果您想同時使用的計劃句柄進行兩個或更多不同的計劃執行,則需要爲每個併發的cufftExec *調用提供臨時緩衝區。

您可以使用cufftSetWorkArea來做到這一點 - 請查看cuFFT文檔中的第3.7節。 2.2節也有助於理解它的工作原理。

下面,顯示的是這個改變你的代碼的工作例如:

$ cat t1241.cu 
#include <cufft.h> 
#include <stdexcept> 
#include <iostream> 
#include <numeric> 
#include <vector> 

#define ck(cmd) if (cmd) { std::cerr << "error at line " << __LINE__ << std::endl;exit(1);} 


__global__ 
void fill_input(cufftComplex * buf, int batch,int nbins,int stride,int seed) 
{ 
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) 
     for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nbins;j += gridDim.x*blockDim.x) 
      buf[i*stride + j] = make_cuFloatComplex((i+seed)%101 - 50,(j+seed)%41-20); 
} 

__global__ 
void check_output(const float * buf1,const float * buf2,int batch, int nfft, int stride, int * errors) 
{ 
    for (int i = blockDim.y * blockIdx.y + threadIdx.y; i< batch;i += gridDim.y*blockDim.y) { 
     for (int j = blockDim.x * blockIdx.x + threadIdx.x; j< nfft;j += gridDim.x*blockDim.x) { 
      float e=buf1[i*stride+j] - buf2[i*stride+j]; 
      if (e*e > 1) // gross error 
       atomicAdd(errors,1); 
     } 
    } 
} 

void demo(bool reuse_plan) 
{ 
    if (reuse_plan) 
     std::cout << "Reusing the same fft plan with multiple stream via cufftSetStream ... "; 
    else 
     std::cout << "Giving each stream its own dedicated fft plan ... "; 
    int nfft = 1024; 
    int batch = 1024; 
    int nstreams = 8; 
    int nbins = nfft/2+1; 
    int nit=100; 
    size_t inpitch,outpitch; 

    std::vector<cufftComplex*> inbufs(nstreams); 
    std::vector<float*> outbufs(nstreams); 
    std::vector<float*> checkbufs(nstreams); 
    std::vector<cudaStream_t> streams(nstreams); 
    std::vector<cufftHandle> plans(nstreams); 
    // if plan reuse, set up independent work areas 
    std::vector<char *> wk_areas(nstreams); 
    for (int i=0;i<nstreams;++i) { 
     ck(cudaStreamCreate(&streams[i])); 
     ck(cudaMallocPitch((void**)&inbufs[i],&inpitch,nbins*sizeof(cufftComplex),batch)); 
     ck(cudaMallocPitch((void**)&outbufs[i],&outpitch,nfft*sizeof(float),batch)); 
     ck(cudaMallocPitch((void**)&checkbufs[i],&outpitch,nfft*sizeof(float),batch)); 
     if (i==0 || reuse_plan==false) 
      ck (cufftPlanMany(&plans[i],1,&nfft,&nbins,1,inpitch/sizeof(cufftComplex),&nfft,1,outpitch/sizeof(float),CUFFT_C2R,batch)); 
    } 
    if (reuse_plan){ 
     size_t ws; 
     ck(cufftGetSize(plans[0], &ws)); 
     for (int i = 0; i < nstreams; i++) 
     ck(cudaMalloc(&(wk_areas[i]), ws)); 
     ck(cufftSetAutoAllocation(plans[0], 0)); 
     ck(cufftSetWorkArea(plans[0], wk_areas[0])); 
     } 
    // fill the input buffers and FFT them to get a baseline for comparison 
    for (int i=0;i<nstreams;++i) { 
     fill_input<<<20,dim3(32,32)>>>(inbufs[i],batch,nbins,inpitch/sizeof(cufftComplex),i); 
     ck (cudaGetLastError()); 
     if (reuse_plan) { 
      ck (cufftExecC2R(plans[0],inbufs[i],checkbufs[i])); 
     }else{ 
      ck (cufftExecC2R(plans[i],inbufs[i],checkbufs[i])); 
      ck(cufftSetStream(plans[i],streams[i])); // only need to set the stream once 
     } 
     ck(cudaDeviceSynchronize()); 
    } 
    // allocate a buffer for the error count 
    int * errors; 
    cudaMallocHost((void**)&errors,sizeof(int)*nit); 
    memset(errors,0,sizeof(int)*nit); 

    // perform the FFTs and check the outputs on streams 
    for (int it=0;it<nit;++it) { 
     int k = it % nstreams; 
     ck(cudaStreamSynchronize(streams[k])); // make sure any prior kernels have completed 
     if (reuse_plan) { 
      ck(cufftSetStream(plans[0],streams[k])); 
      ck(cufftSetWorkArea(plans[0], wk_areas[k])); // update work area pointer in plan 
      ck(cufftExecC2R(plans[0],inbufs[k],outbufs[k])); 
     }else{ 
      ck(cufftExecC2R(plans[k],inbufs[k],outbufs[k])); 
     } 
     check_output<<<100,dim3(32,32),0,streams[k]>>>(outbufs[k],checkbufs[k],batch,nfft,outpitch/sizeof(float),&errors[it]); 
     ck (cudaGetLastError()); 
    } 
    ck(cudaDeviceSynchronize()); 

    // report number of errors 
    int errcount=0; 
    for (int it=0;it<nit;++it) 
     if (errors[it]) 
      ++errcount; 
    std::cout << errcount << " of " << nit << " transforms had errors\n"; 

    for (int i=0;i<nstreams;++i) { 
     cudaFree(inbufs[i]); 
     cudaFree(outbufs[i]); 
     cudaFree(wk_areas[i]); 
     cudaStreamDestroy(streams[i]); 
     if (i==0 || reuse_plan==false) 
      cufftDestroy(plans[i]); 
    } 
} 

int main(int argc,char ** argv) 
{ 
    demo(false); 
    demo(true); 
    return 0; 
} 
$ nvcc -o t1241 t1241.cu -lcufft 
$ ./t1241 
Giving each stream its own dedicated fft plan ... 0 of 100 transforms had errors 
Reusing the same fft plan with multiple stream via cufftSetStream ... 0 of 100 transforms had errors 
$ 
+0

我接受了這個答案,因爲它涉及到問題的核心,它提供了一種方法來解決它。問題在於,在並行流中使用的內部工作緩衝區不安全。在我看來,同步訪問和事件是一個更好的解決方案。首先,大尺寸的計劃執行使設備非常繁忙。所以連續計劃執行本身並不昂貴。額外的同步成本似乎可以忽略不計。這保留了沒有垃圾輸出的單計劃內存佔用。請參閱我的更新代碼中的「FIX」註釋。 –

+0

額外的內存佔用是多大的負擔嗎? – llukas

+0

是的。在我的應用程序中可能會有數百個「頻道」,每個頻道需要不同的FFT大小。我寧願不將內存要求乘以數據流。 –