2016-01-21 67 views
2

當試圖在linux上實現一個簡單的echo服務器併發支持時。c中多線程服務器的設計

以下方法使用:

  • 使用pthread函數來創建線程池,並保持在一個鏈表。它在進程啓動時創建,並在進程終止時銷燬。
  • 主線程將接受請求,並使用POSIX message queue來存儲接受的套接字文件描述符。
  • 線程池循環從消息隊列中讀取,並處理它獲取的請求,當沒有請求時,它將被阻塞。

該程序現在似乎工作。

的問題是:

  • 是否適合在中間使用message queue,其高效就夠了嗎?
  • 完成需要處理來自多個客戶端的併發請求的線程工具的一般方法是什麼?
  • 如果不合適讓池循環中的線程&塊從消息隊列中檢索msg,那麼如何將請求傳遞給線程?

回答

1

這對我來說似乎沒有那麼複雜。對於多線程服務器通常的做法是:

  • 在一個線程過程中創建一個監聽套接字
  • 接受在一個線程
  • 對於每個接受的客戶端連接的客戶端連接,創建一個新的線程,這接收相應的文件描述符和做的工作
  • 工作線程關閉客戶端連接,當它被完全處理

我沒有看到預先填充線程-p多少好處哦,在這裏。

如果你真的想要一個線程池:

我只想用一個鏈表接受的連接和pthread_mutex同步訪問它:

  • 監聽器進程排隊,客戶端FDS在列表的尾部。
  • 客戶將其出院。

如果隊列爲空,則線程可等待變量(pthread_cond_wait),並在連接可用時由偵聽器進程(pthread_cond_signal)通知。

另一種選擇

根據處理請求的複雜性,這可能是使服務器單線程的,即在處理一個線程的所有連接的選項。這消除了上下文切換,因此可以非常高效。

一個缺點是,只使用一個CPU內核。爲了改善這一點,可以使用混合模型:

  • 爲每個核心創建一個工作線程。
  • 每個線程同時處理n個連接。

然而,你將不得不實施機制在工人之間公平地分配工作。

+0

在添加池之前,舊版本與所描述的相同:爲每個請求創建一個新線程,並在請求完全處理後終止線程。但是,即使創建一個線程並不那麼繁重,但我猜,線程池可能會使流量繁重,因此我正在嘗試實現一個線程池。 –

+0

@EricWang爲什麼你不滿意呢? – Ctx

+0

我更新了評論。在繁忙的流量服務器中,線程池是否會比爲每個請求創建/終止線程提供更好的性能? –

1

除了使用pthread_mutex之外,您還需要使用pthread_cond_t(pthread條件),這將允許您將線程池中的線程置於睡眠狀態,以便它們實際上不在工作。否則,如果他們坐在循環中檢查工作隊列中的某些內容,您將浪費計算週期。

我肯定會考慮使用C++而不是純粹的C.我建議它的原因是,在C++中,您可以使用模板。使用純虛擬基類(讓我們稱之爲:「vtask」),您可以創建模板派生類,它們在調用重載的operator()時接受參數並插入參數,從而在您的任務中允許更多功能:

//============================================================================// 

void* thread_pool::execute_thread() 
{ 
    vtask* task = NULL; 
    while(true) 
    { 
     //--------------------------------------------------------------------// 
     // Try to pick a task 
     m_task_lock.lock(); 
     //--------------------------------------------------------------------// 

     // We need to put condition.wait() in a loop for two reasons: 
     // 1. There can be spurious wake-ups (due to signal/ENITR) 
     // 2. When mutex is released for waiting, another thread can be waken up 
     // from a signal/broadcast and that thread can mess up the condition. 
     // So when the current thread wakes up the condition may no longer be 
     // actually true! 
     while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty())) 
     { 
      // Wait until there is a task in the queue 
      // Unlock mutex while wait, then lock it back when signaled 
      m_task_cond.wait(m_task_lock.base_mutex_ptr()); 
     } 

     // If the thread was waked to notify process shutdown, return from here 
     if (m_pool_state == state::STOPPED) 
     { 
      //m_has_exited. 
      m_task_lock.unlock(); 
      //----------------------------------------------------------------// 
      if(mad::details::allocator_list_tl::get_allocator_list_if_exists() && 
       tids.find(CORETHREADSELF()) != tids.end()) 
       mad::details::allocator_list_tl::get_allocator_list() 
         ->Destroy(tids.find(CORETHREADSELF())->second, 1); 
      //----------------------------------------------------------------// 

      CORETHREADEXIT(NULL); 
     } 

     task = m_main_tasks.front(); 
     m_main_tasks.pop_front(); 
     //--------------------------------------------------------------------// 
     //run(task); 
     // Unlock 
     m_task_lock.unlock(); 
     //--------------------------------------------------------------------// 

     // execute the task 
     run(task); 

     m_task_count -= 1; 
     m_join_lock.lock(); 
     m_join_cond.signal(); 
     m_join_lock.unlock(); 

     //--------------------------------------------------------------------// 
    } 
    return NULL; 
} 

//============================================================================// 

int thread_pool::add_task(vtask* task) 
{ 
#ifndef ENABLE_THREADING 
    run(task); 
    return 0; 
#endif 

    if(!is_alive_flag) 
    { 
     run(task); 
     return 0; 
    } 

    // do outside of lock because is thread-safe and needs to be updated as 
    // soon as possible 
    m_task_count += 1; 

    m_task_lock.lock(); 

    // if the thread pool hasn't been initialize, initialize it 
    if(m_pool_state == state::NONINIT) 
     initialize_threadpool(); 

    // TODO: put a limit on how many tasks can be added at most 
    m_main_tasks.push_back(task); 

    // wake up one thread that is waiting for a task to be available 
    m_task_cond.signal(); 

    m_task_lock.unlock(); 

    return 0; 
} 

//============================================================================// 

void thread_pool::run(vtask*& task) 
{ 
    (*task)(); 

    if(task->force_delete()) 
    { 
     delete task; 
     task = 0; 
    } else { 
     if(task->get() && !task->is_stored_elsewhere()) 
      save_task(task); 
     else if(!task->is_stored_elsewhere()) 
     { 
      delete task; 
      task = 0; 
     } 
    } 
} 

在上面,每個創建的線程運行execute_thread(),直到m_pool_state設置爲state :: STOPPED。您鎖定了m_task_lock,如果狀態不是STOPPED且列表爲空,則將m_task_lock傳遞給您的條件,以使線程進入睡眠狀態並釋放鎖定。你創建任務(未顯示),添加任務(順便說一句,m_task_count是一個原子,這就是爲什麼它是線程安全的)。在添加任務期間,條件被標記爲喚醒一個線程,線程從m_task_lock被獲取並鎖定之後,線程從execute_thread()的m_task_cond.wait(m_task_lock.base_mutex_ptr())節開始。

注意:這是一個高度自定義的實現,它將大部分pthread函數/對象封裝到C++類中,因此複製粘貼將不起作用...對不起。和w.r.t. thread_pool :: run(),除非你擔心返回值,否則(* task)()行就是你所需要的。

我希望這會有所幫助。

編輯:m_join_ *引用用於檢查是否所有任務都已完成。主線程處於類似的有條件等待狀態,檢查是否所有任務都已完成,因爲這對於我在執行之前使用此實現的應用程序是必需的。

+0

是的,與'pthread_cond_t'結合,創建生產者/消費者模型thx很有用。 –

+1

這通常被稱爲工作竊取模型,如果您使用的是C++,那麼最簡單的實現可能只是使用TBB,我會強烈推薦。在比較計算時間的基準實施中,此實現與TBB一樣好,但TBB實現要簡單得多。我們追求這個實現的唯一原因是因爲這個實現幾乎不需要重寫現有的代碼,而實現TBB則需要幾乎完全的重寫(儘管如果我們能夠使用C++ 11 lambda,我們不能)。 – jonrobm

+0

我對C++非常陌生,但已計劃提高技能,當我得到時間時會參考代碼。 –