2014-10-21 97 views
0

我想實現一個線程安全的隊列映射。線程安全隊列映射

我打算從空地圖開始。如果該鍵不存在,我想用新的隊列創建一個新的Map條目。如果密鑰確實存在,我想添加到隊列中。我的建議的實施情況如下:

import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 

public class StackOverFlowExample { 

    private final Map<String, ConcurrentLinkedQueue<String>> map = new ConcurrentHashMap<>(); 

    public void addElementToQueue(String key, String value){ 
     if (map.containsKey(key)){ 
      map.get(key).add(value); 
     } 
     else{ 
      ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); 
      queue.add(value); 
      map.put(key, queue); 
     }   
    }  
} 

我擔心的是,當多個線程試圖將一個新值添加到地圖時,首先會放置一個新的隊列中的新地圖項,第二等待,然後爲該密鑰添加一個新的隊列,而不是添加到隊列中。我的併發/併發API知識很少。或許併發性是爲了避免這種情況?建議將不勝感激。

回答

2

這種模式可能已經發布多次對SO(有效地增加了併發圖):

Queue<String> q = map.get(key); 
if(q == null) { 
    q = new ConcurrentLinkedQueue<String>(); 
    Queue<String> curQ = map.putIfAbsent(key, q); 
    if(curQ != null) { 
    q = curQ; 
    } 
} 
q.add(value); 
0

所以你的擔心是,線程A和線程B將執行以下操作:

thread A: lock ConcurrentHashMap Look for Queue "x" (not found) unlock ConcurrentHashMap create Queue "x" lock ConcurrentHashMap Insert Queue X unlock ConcurrentHashMap Thread B: Lock ConcurrentHashMap (while thread A is in 'create Queue X') look for queue X (not found) unlock ConcurrentHashMap (thread A then gets lock) create Queue "x" v2 lock ConcurrentHashMap Insert Queue X v2 (overwriting the old entry) unlock ConcurrentHashMap

這事實上是一個真正的問題,而且是一個很容易做出AddElementToQueue是一個同步的方法解決。然後,在任何給定時間內,AddElementToQueue中只能有一個線程,因此第一個「解鎖」和第二個「鎖定」之間的同步孔關閉。

因此

public synchronized void addElementToQueue(String key, String value){

應該解決您丟失的隊列問題。

+0

那會明智的,如果一個ConcurrentHashMap在檢索時使用鎖定。它沒有。 – spudone 2014-10-21 22:56:06

+0

啊對,ConcurrentHashMap「僅僅」使用了多讀者彈性的數據結構。鎖定只能由作家在從地圖添加或刪除元素所需的短暫時間內使用。我一直忘記這個實現細節,因爲從程序的角度來看,這並不重要。這個問題仍然發生在同一個地方(同時「X存在於HashMap?」與「創建新HashMap」同時)並且解決方案是相同的 - 同步addElementToQueue方法。 – 2014-10-21 23:33:34

+0

使用synchronized塊,他並不需要併發數據結構,除非代碼更多。 – spudone 2014-10-21 23:41:53

0

如果你的Java 8是一個選項:

public void addElementToQueue(String key, String value) { 
    map.merge(key, new ConcurrentLinkedQueue<>(Arrays.asList(value)), (oldValue, coming) -> { 
     oldValue.addAll(coming); 
     return oldValue; 
    }); 
}