2017-09-04 139 views
1

在我嘗試向我的Worker [線程]類添加暫停/恢復功能時,發生了一個我無法解釋的問題。 (C++ 1y/VS2015)工作線程暫停/恢復實施

這個問題看起來像是一個死鎖,但是我似乎無法在一個調試器被連接並且在某個點之前設置斷點時重現它(見#1) - 因此它看起來像這是一個計時問題。

我能找到的修復(#2)對我來說並沒有多大意義,因爲它需要更長時間保持互斥體,並且客戶機代碼可能會嘗試獲取其他互斥體,而我知道其中的其他互斥體增加死鎖的機會。

但它確實解決了這個問題。

工人循環:

Job* job; 
while (true) 
{ 
    { 
    std::unique_lock<std::mutex> lock(m_jobsMutex); 
    m_workSemaphore.Wait(lock); 

    if (m_jobs.empty() && m_finishing) 
    { 
     break; 
    } 

    // Take the next job 
    ASSERT(!m_jobs.empty()); 
    job = m_jobs.front(); 
    m_jobs.pop_front(); 
    } 

    bool done = false; 
    bool wasSuspended = false; 
    do 
    { 
    // #2 
    { // Removing this extra scoping seemingly fixes the issue BUT 
     // incurs us holding on to m_suspendMutex while the job is Process()ing, 
     // which might 1, be lengthy, 2, acquire other locks. 
     std::unique_lock<std::mutex> lock(m_suspendMutex); 
     if (m_isSuspended && !wasSuspended) 
     { 
     job->Suspend(); 
     } 
     wasSuspended = m_isSuspended; 

     m_suspendCv.wait(lock, [this] { 
     return !m_isSuspended; 
     }); 

     if (wasSuspended && !m_isSuspended) 
     { 
     job->Resume(); 
     } 
     wasSuspended = m_isSuspended; 
    } 

    done = job->Process(); 
    } 
    while (!done); 
} 

掛起/恢復只是:

void Worker::Suspend() 
{ 
    std::unique_lock<std::mutex> lock(m_suspendMutex); 
    ASSERT(!m_isSuspended); 
    m_isSuspended = true; 
} 

void Worker::Resume() 
{ 
    { 
    std::unique_lock<std::mutex> lock(m_suspendMutex); 
    ASSERT(m_isSuspended); 
    m_isSuspended = false; 
    } 
    m_suspendCv.notify_one(); // notify_all() doesn't work either. 
} 

的(Visual Studio中)測試:

struct Job: Worker::Job 
    { 
    int durationMs = 25; 
    int chunks = 40; 
    int executed = 0; 

    bool Process() 
    { 
     auto now = std::chrono::system_clock::now(); 
     auto until = now + std::chrono::milliseconds(durationMs); 
     while (std::chrono::system_clock::now() < until) 
     { /* busy, busy */ 
     } 

     ++executed; 
     return executed < chunks; 
    } 

    void Suspend() { /* nothing here */ } 
    void Resume() { /* nothing here */ } 
    }; 

    auto worker = std::make_unique<Worker>(); 

    Job j; 
    worker->Enqueue(j); 

    std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs)); // Wait at least one chunk. 

    worker->Suspend(); 

    Assert::IsTrue(j.executed < j.chunks); // We've suspended before we finished. 
    const int testExec = j.executed; 

    std::this_thread::sleep_for(std::chrono::milliseconds(j.durationMs * 4)); 

    Assert::IsTrue(j.executed == testExec); // We haven't moved on. 

    // #1 
    worker->Resume(); // Breaking before this call means that I won't see the issue. 
    worker->Finalize(); 

    Assert::IsTrue(j.executed == j.chunks); // Now we've finished. 

我缺少/做錯了嗎?爲什麼該工作的過程()必須由suspend互斥鎖來保護?

編輯Resume()在通知的時候不應該一直保留在互斥量上;這是固定的 - 問題依然存在。

回答

0

當然,作業的Process()不必由suspend互斥鎖來保護。

j.executed的訪問 - 對於斷言和遞增 - 然而確實需要同步(或者通過將其設置爲std::atomic<int>或通過用互斥鎖來保護它)。

現在仍不清楚爲什麼問題表現出它的方式(因爲我沒有寫入主線程上的變量) - 可能是undefined behaviour propagating backwards in time的情況。