2010-03-30 39 views
9

我有一個多線程應用程序編寫和讀取ConcurrentLinkedQueue,它在概念上用於備份列表/表中的條目。我最初使用了一個ConcurrentHashMap來運行,效果很好。需要跟蹤訂單條目的新要求已送達,因此可以根據某些條件以最早的一級訂單取消訂單條目。 ConcurrentLinkedQueue似乎是一個不錯的選擇,在功能上它運行良好。

可配置數量的條目保存在內存中,並且當達到限制時提供新條目時,會按照最老的第一順序搜索可以刪除的條目。系統不會刪除某些條目,並等待客戶端進行交互。

似乎正在發生的事情是我在隊列的前面出現了一個條目,比如說前面的100K條目。該隊列似乎具有有限數量的配置項(size()== 100),但在分析時,我發現內存中有〜100K ConcurrentLinkedQueue $ Node對象。這看起來是有意設計的,只是看一下ConcurrentLinkedQueue的源代碼,刪除操作只是刪除對被存儲對象的引用,但爲迭代而留下鏈接列表。

最後我的問題:是否有一種「更好」的懶惰方式來處理這種性質的集合?我喜歡ConcurrentLinkedQueue的速度,我不能承受在這種情況下可能出現的無限泄漏。如果沒有,似乎我不得不創建第二個結構來跟蹤訂單,並可能有相同的問題,再加上同步問題。

+0

如果您在此處找不到答案,請轉至http://altair.cs.oswego.edu/mai lman/listinfo/concurrency-interest並在那裏發佈你的問題。可能ConcurrentLinkedQueue的作者會給你一個決定性的答案。 – 2010-03-30 17:45:24

回答

9

這裏實際發生的是remove方法準備一個輪詢線程來清空鏈接的引用。

ConcurrentLinkedQueue是一個非阻塞的線程安全隊列實現。但是,當您嘗試從隊列中輪詢節點時,它是一個雙重功能的過程。首先你將該值歸零,然後你將參考歸零。 CAS操作是一個單獨的原子函數,不會爲投票提供快速解決方案。

當您輪詢時會發生什麼情況,即第一個成功的線程將獲取節點的值並將該值設爲null,然後該線程將嘗試將該引用歸零。然後可能會有另一個線程進入並嘗試從隊列中輪詢。爲了確保這個隊列擁有一個非阻塞屬性(即一個線程的失敗不會導致另一個線程的失敗),新的包含線程將會看到該值是否爲null,如果該線程爲null,則該線程將爲該參考歸零並嘗試再次輪詢()。

所以你看到發生在這裏的是刪除線程只是準備任何新的輪詢線程爲空引用。試圖實現非阻塞刪除功能我認爲幾乎是不可能的,因爲那需要三個原子功能。值爲null的值引用所述節點,最後是從該節點父節點到其後繼節點的新引用。

回答你最後一個問題。沒有更好的方法來實現刪除和維護隊列的非阻塞狀態。至少在這一點上。一旦處理器開始採用2和3方式外殼,那麼這是可能的。

+1

這是Michael和Scott Queue背後的基本原理。如果第一個線程出現故障並且在清除時「清除」,那麼下一個線程將能夠確保隊列沒有中斷。 http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html – Jeremy 2010-03-30 18:32:51

+0

@Jer精確 - 感謝您的參考鏈接。 CLQ確實使用MCS隊列作爲meeroend by jer – 2010-03-30 18:35:49

+1

非常好的解釋,謝謝。 – 2010-03-30 19:12:11

1

隊列的主要語義是add/poll。如果在ConcurrentLinkedQueue上使用poll(),它將被清理,因爲它應該。根據你的描述,poll()應該讓你刪除最舊的條目。爲什麼不用它來代替remove()

+0

在這種情況下,隊列中的第一個條目需要保留(在域規則下它不能被系統刪除)。你說得對,這是一個語義問題,我基本上是在尋找一個神奇的併發,非阻塞的列表實現,而且這非常接近,但沒有雪茄。 – 2010-03-30 19:13:40

1

查看1.6的源代碼。0_29,似乎CLQ的迭代器被修改爲嘗試刪除具有空項目的節點。相反的:

p = p.getNext(); 

的代碼是現在:

Node<E> next = succ(p); 
if (pred != null && next != null) 
    pred.casNext(p, next); 
p = next; 

這加入的修補程序的BUG:http://bugs.sun.com/view_bug.do?bug_id=6785442

事實上,當我嘗試以下方法,我得到一個OOME與舊版本,但不包括新版本:

Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); 
for (int i=0; i<10000; i++) 
{ 
    for (int j=0; j<100000; j++) 
    { 
     queue.add(j); 
    } 
    boolean start = true; 
    for (Iterator<Integer> iter = queue.iterator(); iter.hasNext();) 
    { 
     iter.next(); 
     if (!start) 
      iter.remove(); 
     start = false; 
    } 
    System.out.println(i); 
}