2015-08-03 70 views
1

我正在製作視頻軟件並使用一些現有的代碼。現有的代碼包含一個循環緩衝區。作爲製片人,我有一臺攝像機和消費者兩個不同的線程。其中一個GLThread使用OpenGL繪製幀,另一個VideoCompressorThread將幀壓縮爲jpeg格式以將其保存爲視頻文件。奇怪的是,目前這兩個線程同時在相同的數據上工作,但這不會產生競爭條件。在GLThread我:jpeg_write_scanlines和glTexImage2D線程安全。爲什麼不這樣崩潰?

while(!shouldStop) { 
     mutex_.lock(); 
     glw_->makeCurrent(); 

     shaderProgram_.bind(); 
     shaderProgram_.setUniformValue("texture", 0); 
     shaderProgram_.setAttributeArray("vertex", vertices_.constData()); 
     shaderProgram_.enableAttributeArray("vertex"); 
     shaderProgram_.setAttributeArray("textureCoordinate", textureCoordinates_.constData()); 
     shaderProgram_.enableAttributeArray("textureCoordinate"); 

     qDebug() << "GLThread: " << "data address: " << static_cast<void*>(imBuf_) << "time: " << QDateTime::currentMSecsSinceEpoch(); 
     glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB8, VIDEO_WIDTH, VIDEO_HEIGHT, 0, GL_RGB, GL_UNSIGNED_BYTE, (GLubyte*)imBuf_); 
     qDebug() << "GLThread finished"; 

     glClear(GL_COLOR_BUFFER_BIT); 
     glDrawArrays(GL_TRIANGLES, 0, 6); 
     glw_->swapBuffers(); 

     shaderProgram_.disableAttributeArray("vertex"); 
     shaderProgram_.disableAttributeArray("textureCoordinate"); 
     shaderProgram_.release(); 

     glw_->doneCurrent(); 
     mutex_.unlock(); 
} 

,並在VideoCompressorThread:

while(!shouldStop) 
{ 
    // JPEG-related stuff 
    struct jpeg_compress_struct cinfo; 
    struct jpeg_error_mgr  jerr; 
    JSAMPROW     row_pointer; 
    unsigned char*    jpgBuf=NULL; 
    unsigned long    jpgBufLen=0; 

    unsigned char*    data; 
    ChunkAttrib     chunkAttrib; 

    // Get raw image from the input buffer 
    data = inpBuf->getChunk(&chunkAttrib); 

    // Initialize JPEG 
    cinfo.err = jpeg_std_error(&jerr); 
    jpeg_create_compress(&cinfo); 
    jpeg_mem_dest(&cinfo, &jpgBuf, &jpgBufLen); 

    // Set the parameters of the output file 
    cinfo.image_width = VIDEO_WIDTH; 
    cinfo.image_height = VIDEO_HEIGHT; 
    cinfo.input_components = 3; 
    cinfo.in_color_space = JCS_RGB; 

    // Use default compression parameters 
    jpeg_set_defaults(&cinfo); 
    jpeg_set_quality(&cinfo, jpgQuality, TRUE); 

    // Do the compression 
    jpeg_start_compress(&cinfo, TRUE); 

    // write one row at a time 
    qDebug() << "VideoCompressorThread: " << "data address: " << static_cast<void*>(data) << "time: " << QDateTime::currentMSecsSinceEpoch(); 
    while(cinfo.next_scanline < cinfo.image_height) 
    { 
     row_pointer = (data + (cinfo.next_scanline * cinfo.image_width * 3)); 
     jpeg_write_scanlines(&cinfo, &row_pointer, 1); 
    } 
    qDebug() << "VideoCompressorThread finished"; 

    // clean up after we're done compressing 
    jpeg_finish_compress(&cinfo); 


    // Insert compressed image into the output buffer 
    chunkAttrib.chunkSize = jpgBufLen; 
    outBuf->insertChunk(jpgBuf, chunkAttrib); 

    // The output buffer needs to be explicitly freed by the libjpeg client 
    free(jpgBuf); 
    jpeg_destroy_compress(&cinfo); 
} 

作爲輸出我得到:

VideoCompressorThread: data address: 0x7fffbdcd1060 time: 1438594694479 
VideoCompressorThread finished 
GLThread: data address: 0x7fffbdcd1060 time: 1438594694488 
GLThread finished 
GLThread: data address: 0x7fffbddb20b0 time: 1438594694497 
GLThread finished 
VideoCompressorThread: data address: 0x7fffbddb20b0 time: 1438594694498 
VideoCompressorThread finished 
VideoCompressorThread: data address: 0x7fffbde93100 time: 1438594694521 
GLThread: data address: 0x7fffbde93100 time: 1438594694521 
GLThread finished 
VideoCompressorThread finished 
VideoCompressorThread: data address: 0x7fffbdf74150 time: 1438594694538 
GLThread: data address: 0x7fffbdf74150 time: 1438594694538 
GLThread finished 
VideoCompressorThread finished 
VideoCompressorThread: data address: 0x7fffbe0551a0 time: 1438594694555 
GLThread: data address: 0x7fffbe0551a0 time: 1438594694555 
GLThread finished 
VideoCompressorThread finished 
VideoCompressorThread: data address: 0x7fffbe1361f0 time: 1438594694571 
GLThread: data address: 0x7fffbe1361f0 time: 1438594694571 
GLThread finished 
VideoCompressorThread finished 
VideoCompressorThread: data address: 0x7fffbe217240 time: 1438594694588 
GLThread: data address: 0x7fffbe217240 time: 1438594694588 
GLThread finished 
VideoCompressorThread finished 
VideoCompressorThread: data address: 0x7fffbe2f8290 time: 1438594694604 
GLThread: data address: 0x7fffbe2f8290 time: 1438594694604 
GLThread finished 
VideoCompressorThread finished 

正如你所看到的,有時兩個線程訪問相同的數據在同一時間,但沒有崩潰。這是純粹的運氣,還是有什麼我不明白在這裏?如果這有什麼不同,我使用Xubuntu 14.04。

編輯。 insertChunk和getChunk()函數。請注意,只有VideoCompressorThread通過getChunk()獲取數據指針。 GLThread連接到chunkReady qt信號。這使得緩衝區可以使用一個主要消費者和多個次要消費者。

void CycDataBuffer::insertChunk(unsigned char* _data, ChunkAttrib &_attrib) 
{ 

    // Check for buffer overflow. CIRC_BUF_MARG is the safety margin against 
    // race condition between consumer and producer threads when the buffer 
    // is close to full. 
    if (buffSemaphore->available() >= bufSize * (1-CIRC_BUF_MARG)) 
    { 
     cerr << "Circular buffer overflow!" << endl; 
     abort(); 
    } 

    // Make sure that the safety margin is at least several (four) times the 
    // chunk size. This is necessary to prevent the race condition between 
    // consumer and producer threads when the buffer is close to full. 
    if(_attrib.chunkSize+sizeof(ChunkAttrib)+MAXLOG > bufSize*MAX_CHUNK_SIZE) 
    { 
     cerr << "The chunk size is too large!" << endl; 
     abort(); 
    } 

    // insert the data into the circular buffer 
    _attrib.isRec = isRec; 

    memcpy(dataBuf + insertPtr, (unsigned char*)(&_attrib), sizeof(ChunkAttrib)); 
    insertPtr += sizeof(ChunkAttrib); 
    buffSemaphore->release(sizeof(ChunkAttrib)); 

    memcpy(dataBuf + insertPtr, _data, _attrib.chunkSize); 
    buffSemaphore->release(_attrib.chunkSize); 

    emit chunkReady(dataBuf + insertPtr); 
    insertPtr += _attrib.chunkSize; 
    if(insertPtr >= bufSize) 
    { 
     insertPtr = 0; 
    } 
} 

unsigned char* CycDataBuffer::getChunk(ChunkAttrib* _attrib) 
{ 
    unsigned char* res; 

    buffSemaphore->acquire(sizeof(ChunkAttrib)); 
    memcpy((unsigned char*)_attrib, dataBuf + getPtr, sizeof(ChunkAttrib)); 
    getPtr += sizeof(ChunkAttrib); 

    buffSemaphore->acquire(_attrib->chunkSize); 
    res = dataBuf + getPtr; 

    getPtr += _attrib->chunkSize; 
    if(getPtr >= bufSize) 
    { 
     getPtr = 0; 
    } 

    return(res); 
} 
+0

好吧,很難說:我看到你在第一個線程中有一個'mutex_.lock();/unlock()'。你確定你沒有在第二個隱藏的互斥鎖(例如在getChunk()或insertChunk())中? – Christophe

+0

我只使用互斥鎖來同步GLThread中的一些OpenGL東西。我沒有編寫getChunk()和insertChunk()的代碼,但我確定我的互斥鎖與它們沒有任何關係。另外,如果某個地方存在隱藏的互斥鎖,那麼輸出是否也應該同步呢?現在,顯然GLThread和VideoCompressorThread在沒有任何同步的情況下同時工作,因此這裏不能涉及任何互斥體? – user2563661

+0

「*同時工作*」:GL線程輸出似乎永遠不會在* data ... *和* finished *之間中斷。你能否在你的代碼中強調你期望出現競賽的地方(即訪問相同的循環緩衝區),並告訴我們如何管理這個緩衝區? – Christophe

回答

1

除了棘輪怪胎的罰款答案「只是因爲它不崩潰並不意味着它不是一個錯誤」,我想補充一點,我實際上沒有看到爲什麼這兩個特定的代碼片段的原因無法並行工作。兩者都訪問相同的圖像數據只讀,這是非常好的。

只有至少有兩個線程(或共享內存的進程)訪問相同的緩衝區並且其中至少有一個正在修改它時,纔會出現併發訪問緩衝區的問題,即通過覆蓋數據,或者通過取消分配緩衝區。

2

只是因爲它不會崩潰並不意味着它不是一個錯誤。另一個線程正在讀取時寫入緩衝區通常會導致數據損壞被讀取線程讀取。將一些字節讀作新值,而另一些則是舊的。

你會看到發生的事情之一是圖像緩衝區的部分被覆蓋,而另一個線程正在處理它,這將導致在觀看視頻時產生screen-tearing。斜線條紋在屏幕上快速移動,可以看到最佳效果。

2線程讀取相同的緩衝區是完全正常的,它是當人們開始寫它時,問題開始。

+0

如果兩個線程只從與我的示例相同的地址讀取,該怎麼辦?這是否允許? – user2563661

+0

@ user2563661只有閱讀完全正常。 (以及存在讀寫鎖的原因) –