2009-08-06 32 views
0

我正在尋找一個併發對象,可以幫助在以下用例:併發對象作家通吃優先級超額讀者

  • 線程/實體:1個出版商(唯一),O-許多讀取器經常/不正確地更新數據結構,需要以最小的延遲快速地更新數據結構
  • 每個讀取器都具有對數據結構的讀取訪問權限(通過不允許寫入的東西或者因爲讀取器暗示承諾不改變數據)
  • 每個讀者我只要它能夠檢測到發佈者何時來修改它,因爲它知道它最終將獲得足夠的時間來閱讀它所需的數據結構。

有什麼建議嗎?我可以使用ReentrantReadWriteLock,但有點擔心阻止發佈者。我寧願讓出版商能夠毀掉讀者閱讀的機會,而不是讓讀者能夠容忍出版商。

出版商螺紋:

PublisherSignal ps = new PublisherSignal(); 
publishToAllReaders(ps.getReaderSignal()); 

    ... 

while (inLoop()) 
{ 
     ps.beginEdit(); 
     data.setSomething(someComputation()); 
     data.setSomethingElse(someOtherComputation()); 
     ps.endEdit(); 

     doOtherStuff(); 
} 

讀線程:

PublisherSignal.Reader rs = acquireSignalFromPublisher(); 

     ... 

while (inLoop()) 
{ 
     readDataWhenWeGetAChance(); 

     doOtherStuff(); 
} 

     ... 

public readDataWhenWeGetAChance() 
{ 
     while (true) 
     { 
      rs.beginRead(); 
      useData(data.getSomething(), data.getSomethingElse()); 
      if (rs.endRead()) 
      { 
       // we get here if the publisher hasn't done a beginEdit() 
       // during our read. 
       break; 
      } 

      // darn, we have to try again. 
      // might as well yield thread if appropriate 
      rs.waitToRead(); 
     } 
} 

編輯:在較高的水平,我試圖做的是有出版商變化數據數千倍第二,然後讓讀者以更慢的速度顯示最新的更新(每秒5-10次)。我將使用ConcurrentLinkedQueue來發布更新已發生的事實,除了(a)可能有數百個更新在同一個項目上,我想合併,因爲不得不復制大量數據 看起來像是浪費 是一個性能問題,(b)擁有多個閱讀器似乎排除了一個隊列......我想我可以有一個主代理閱讀器並讓它通知每個真實的閱讀器。

回答

0

嗯...我想我的絆腳石是圍繞着共享的數據結構本身......我一直在使用類似

public class LotsOfData 
{ 
     int fee; 
     int fi; 
     int fo; 
     int fum; 

     long[] other = new long[123]; 

     /* other fields too */ 
} 

哪裏發佈者經常更新數據,但一次只能有一個字段。

聽起來也許我應該找到一種方法,序列化更新的方式,這有利於使用生產者 - 消費者隊列:

public class LotsOfData 
{ 
     enum Field { FEE, FI, FO, FUM }; 
     Map<Field, Integer> feeFiFoFum = new EnumMap<Field, Integer>(); 

     long[] other = new long[123]; 

     /* other fields too */ 
} 

,然後發佈變更的項目到一個隊列,像(FEE,23 )爲feeFiFoFum字段,(33,1234567L)爲other陣列。 (出於性能的原因,Bean類型的反射幾乎肯定會出現。)

不過,看起來好像我被髮布者寫的任何想要的顯而易見的簡單所寵壞,並且知道讀者將有時間(最終)進入並獲得一個一致的一組數據,如果它只有一個標誌,它可以用來判斷數據是否已被修改。

更新:有趣,我嘗試這個方法,將具有突變的對象(只存儲必需的1個變化的狀態)的一個的ConcurrentLinkedQueue爲類似於第一LotsOfData上述(4個int字段和27名多頭陣列的類),以及一個生產者,它在大約10000個批次之間產生總計1000萬個帶有Thread.sleep(1)的變異,以及一個每隔100毫秒檢查一次隊列的消費者,並消費出現的任何突變。我跑測試在許多方面:

    測試框架內
  • 空的動作(只是循環1000次,調用了Thread.sleep(1),並檢查是否使用空對象):1.95秒,在我的3GHz奔騰4運行jre6u13。
  • 測試動作1 - 創建並僅在生產者端應用突變:4.3秒
  • 測試動作2 - 創建並在生產者端應用的突變,將每個隊列,因爲它們被創建:12秒

所以平均230nsec創建每個突變對象,平均770nsec將每個突變對象排入/出隊到生產者的隊列中並在消費者中將其拉出(對於基元類型執行突變的時間似乎可以忽略不計,與對象創建和隊列操作相比,應該是這樣)。不錯,我想,它給了我一些指標來估計這種方法的性能成本。

1

爲什麼不使用BlockingQueue

您的發佈者可以寫入此隊列,而不管任何正在閱讀的內容。讀者(同樣)可以從隊列中取走東西,而不用擔心阻止作者。線程安全由隊列處理,因此2個線程可以寫入/讀取,不需要進一步的同步等。

從鏈接DOC:

class Producer implements Runnable { 
    private final BlockingQueue queue; 
    Producer(BlockingQueue q) { queue = q; } 
    public void run() { 
    try { 
     while(true) { queue.put(produce()); } 
    } catch (InterruptedException ex) { ... handle ...} 
    } 
    Object produce() { ... } 
} 

class Consumer implements Runnable { 
    private final BlockingQueue queue; 
    Consumer(BlockingQueue q) { queue = q; } 
    public void run() { 
    try { 
     while(true) { consume(queue.take()); } 
    } catch (InterruptedException ex) { ... handle ...} 
    } 
    void consume(Object x) { ... } 
} 
+0

看到我的評論我補充說。 – 2009-08-06 14:05:52

+0

我會讓一個消費者獲取數據,合併它,然後將其傳遞給多個下游消費者(可能通過其他隊列?) – 2009-08-06 14:07:09