2011-12-02 88 views
4

[編輯:感謝MSalters的回答和Raymond Chen的回答InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection,問題解決了,下面的代碼工作正常。這應該提供一個在Windows中使用線程池的有趣簡單示例]Windows API線程池簡單示例

我無法找到以下任務的簡單示例。例如,我的程序需要將一個巨大的std :: vector中的值加1,所以我想要並行執行。它需要在程序的整個生命週期內進行很多次。我知道如何在每次調用例程時使用CreateThread,但是我沒有設法使用ThreadPool去除CreateThread。

這裏是我做的:

class Thread { 
public: 
    Thread(){} 
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread" 
}; 
class IncrementVectorThread: public Thread { 
public: 
    IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { }; 

    virtual void run() { 
     for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++) 
      myvec[i]++; //and let's assume myvec is properly sized 
    } 
    int id, nb; 
    std::vector<int> &myvec; 
}; 

class ThreadGroup : public std::vector<Thread*> { 
public: 
    ThreadGroup() { 
     pool = CreateThreadpool(NULL); 
     InitializeThreadpoolEnvironment(&cbe); 
     cleanupGroup = CreateThreadpoolCleanupGroup(); 
     SetThreadpoolCallbackPool(&cbe, pool); 
     SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL); 
     threadCount = 0; 
    } 
    ~ThreadGroup() { 
     CloseThreadpool(pool); 
} 
    PTP_POOL pool; 
    TP_CALLBACK_ENVIRON cbe; 
    PTP_CLEANUP_GROUP cleanupGroup; 
    volatile long threadCount; 
} ; 


static VOID CALLBACK runFunc(
       PTP_CALLBACK_INSTANCE Instance, 
       PVOID Context, 
       PTP_WORK Work) { 

    ThreadGroup &thread = *((ThreadGroup*) Context); 
    long id = InterlockedIncrement(&(thread.threadCount)); 
    DWORD tid = (id-1)%thread.size(); 
    thread[tid]->run(); 
} 

void run_threads(ThreadGroup* thread_group) { 
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size()); 
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size()); 

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe); 
    thread_group->threadCount = 0; 
    for (int i=0; i<thread_group->size(); i++) { 
     SubmitThreadpoolWork(worker); 
    } 
    WaitForThreadpoolWorkCallbacks(worker,FALSE); 
    CloseThreadpoolWork(worker); 
}  

void main() { 

    ThreadGroup group; 
    std::vector<int> vec(10000, 0); 
    for (int i=0; i<10; i++) 
     group.push_back(new IncrementVectorThread(i, 10, vec)); 

    run_threads(&group); 
    run_threads(&group); 
    run_threads(&group); 

    // now, vec should be == std::vector<int>(10000, 3);  
} 

所以,如果我深知:
- 命令CreateThreadpool創建了一堆線程(因此,調用CreateThreadpoolWork是便宜的,因爲它不調用CreateThread)
- 我可以擁有儘可能多的線程池(如果我想爲「IncrementVector」創建一個線程池,併爲我的「DecrementVector」線程創建一個線程池)。
- 如果我需要將我的「增量向量」任務分成10個線程,而不是調用10次CreateThread,我創建一個「worker」,並使用相同的參數向ThreadPool提交10次(因此,我需要回調中的線程ID來知道std :: vector的哪個部分要增加)。在這裏我找不到線程ID,因爲函數GetCurrentThreadId()返回線程的真實ID(即,類似1528的東西,而不是0..nb_launched_threads之間的東西)。

最後,我不確定我是否理解這個概念:如果我將std :: vector分成10個線程,我真的需要單個worker而不是10嗎?

謝謝!

回答

5

你基本上直到最後一點。

關於線程池的整個想法是,你不關心它有多少個線程。您只需將大量工作投入到線程池中,並讓OS確定如何執行每個塊。 因此,如果您創建並提交了10個塊,操作系統可能會使用池中的1到10個線程。

你不應該關心那些線程標識。不要打擾線程ID,最小或最大線程數或類似的東西。

如果您不關心線程標識,那麼您如何管理要更改的向量的哪一部分?簡單。在創建線程池之前,將計數器初始化爲零。在回調函數中,調用InterlockedIncrement來檢索和增加計數器。對於每個提交的工作項目,您將獲得一個連續的整數。

+0

謝謝 - upvoted!我做了你的建議,但是用上面的簡單代碼,最後我仍然沒有在向量中得到適當的值:s – WhitAngl

+0

答案重新驗證:計數器沒有在正確的位置遞增:)謝謝! – WhitAngl

+0

對不起,我未驗證以重新打開該問題。上面的代碼在10000次執行中約有7次未能給出只包含「3」的向量:s – WhitAngl