2014-09-26 56 views
1

我有一個例程是爲了加載和解析文件中的數據。可能需要同時從兩個位置檢索來自同一文件的數據,即在後臺緩存過程和用戶​​請求期間。併發處理數據。我需要注意什麼?

具體來說,我使用的是C++ 11線程和互斥體庫。我們用Visual C++ 11(aka 2012)進行編譯,所以受到它缺少的限制。

我幼稚的做法又是這樣的:

map<wstring,weak_ptr<DataStruct>> data_cache; 
mutex data_cache_mutex; 

shared_ptr<DataStruct> ParseDataFile(wstring file_path) { 
    auto data_ptr = make_shared<DataStruct>(); 

    /* Parses and processes the data, may take a while */ 

    return data_ptr; 
} 

shared_ptr<DataStruct> CreateStructFromData(wstring file_path) { 
    lock_guard<mutex> lock(data_cache_mutex); 

    auto cache_iter = data_cache.find(file_path); 
    if (cache_iter != end(data_cache)) { 
     auto data_ptr = cache_iter->second.lock(); 
     if (data_ptr) 
      return data_ptr; 
     // reference died, remove it 
     data_cache.erase(cache_iter); 
    } 

    auto data_ptr = ParseDataFile(file_path); 

    if (data_ptr) 
     data_cache.emplace(make_pair(file_path, data_ptr)); 

    return data_ptr; 
} 

我的目標是雙重的:

  • 允許多個線程加載單獨的文件同時
  • 確保文件是隻處理一旦

我目前的做法的問題是,它不允許同時解析多個文件。如果我明白會發生什麼,他們每個人都會觸發鎖定並最終以線性方式處理,一次一個線程。它可能會從運行變爲運行線程先通過鎖的順序,但最終結果是相同的。

一個解決方案,我認爲是要建立一個第二個地圖:

map<wstring,mutex> data_parsing_mutex; 

shared_ptr<DataStruct> ParseDataFile(wstring file_path) { 
    lock_guard<mutex> lock(data_parsing_mutex[file_path]); 
    /* etc. */ 
    data_parsing_mutex.erase(file_path); 
} 

但現在我不得不與data_parsing_mutex是如何被更新有關。所以我想我需要另一個互斥體?

map<wstring,mutex> data_parsing_mutex; 
mutex data_parsing_mutex_mutex; 

shared_ptr<DataStruct> ParseDataFile(wstring file_path) { 
    unique_lock<mutex> super_lock(data_parsing_mutex_mutex); 
    lock_guard<mutex> lock(data_parsing_mutex[file_path]); 
    super_lock.unlock(); 

    /* etc. */ 

    super_lock.lock(); 
    data_parsing_mutex.erase(file_path); 
} 

事實上,看着這一點,它不會避免一定雙處理一個文件,如果它沒有被後臺進程,當用戶請求,除非我再次檢查高速緩存完成。

但是到現在爲止我的嗅覺已經在說There must be a better way了。在那兒?期貨,承諾或原子會在這裏幫助我嗎?

+0

據我所知,問題不在'ParseDataFile()'本身?如果從另一個互斥體(在'CreateStructFromData()')中調用它,添加互斥體的意義是什麼 – 2014-09-27 00:15:59

+0

創建了第二個互斥體並將其與傳遞給'ParseDataFile'的每個單獨文件關聯。外部互斥量投影緩存,而內部互斥量則旨在防止兩個線程從同一個文件執行相同的工作。 – 2014-09-27 01:00:26

+1

@MikeE你嘗試過使用[異步](http://en.cppreference.com/w/cpp/thread/async)而不是顯式並行嗎? – Jason 2014-09-27 04:10:55

回答

1

從你所描述的,這聽起來像你試圖做一個形式的使用線程池,以及參考計數緩存DataStruct的延遲初始化。 std::async應該能夠提供很多這樣的事情所需的調度和同步。

使用std::async,代碼看起來像這樣...

map<wstring,weak_ptr<DataStruct>> cache; 
map<wstring,shared_future<shared_ptr<DataStruct>>> pending; 

mutex cache_mutex, pending_mutex; 

shared_ptr<DataStruct> ParseDataFromFile(wstring file) { 
    auto data_ptr = make_shared<DataStruct>(); 

    /* Parses and processes the data, may take a while */ 

    return data_ptr; 

} 


shared_ptr<DataStruct> CreateStructFromData(wstring file) { 
    shared_future<weak_ptr<DataStruct>> pf; 
    shared_ptr<DataStruct> ce; 

    { 
     lock_guard(cache_mutex); 

     auto ci = cache.find(file); 

     if (!(ci == cache.end() || ci->second.expired())) 
      return ci->second.lock(); 
    } 

    { 
     lock_guard(pending_mutex); 

     auto fi = pending.find(file); 

     if (fi == pending.end() || fi.second.get().expired()) { 

      pf = async(ParseDataFromFile, file).share(); 
      pending.insert(fi, make_pair(file, pf)); 

     } else { 

      pf = pi->second; 

     } 
    } 

    pf.wait(); 
    ce = pf.get(); 

    { 
     lock_guard(cache_mutex); 

     auto ci = cache.find(file); 

     if (ci == cache.end() || ci->second.expired()) 
      cache.insert(ci, make_pair(file, ce)); 
    } 

    { 
     lock_guard(pending_mutex); 

     auto pi = pending.find(file); 

     if (pi != pending.end()) 
      pending.erase(pi); 
    } 

    return ce; 

} 

這可能可以優化一點,但一般的想法應該是一樣的。

0

在典型的計算機上,試圖同時加載文件沒有意義,因爲磁盤訪問將成爲瓶頸。相反,最好有一個線程加載文件(或使用異步I/O),並將解析分散到一個線程池中。然後將結果存儲在共享容器中。

關於防止雙工,你應該考慮這是否真的有必要。如果您只是通過不成熟的優化來做到這一點,那麼您可能會通過專注於使程序響應而非高效來讓用戶更快樂。也就是說,確保用戶快速獲得他們所要求的內容,即使這意味着要做雙重工作。

OTOH,如果存在兩次解析文件的技術原因,可以跟蹤每個文件(加載,解析,解析)在共享容器中的狀態。

+0

在這種情況下,它是必要的。該過程所做的大部分工作是在讀取數據後攪動數據。瓶頸不是I/O,幾乎立即結束。我正在實現這一點,因爲切換到另一組數據的延遲是一個問題,它肯定是可以在後臺完成的事情。 – 2014-09-28 00:01:36

相關問題