2015-10-20 44 views
3

我有一個Metrics類,它應該跟蹤我們每秒處理多少個事務以及它們需要多長時間。其結構的相關部分看起來是這樣的:多個變量之間的java線程安全

public class Metrics { 
    AtomicLong sent = new AtomicLong(); 
    AtomicLong totalElapsedMsgTime = new AtomicLong(); 

    AtomicLong sentLastSecond = new AtomicLong(); 
    AtomicLong avgTimeLastSecond = new AtomicLong(); 

    public void outTick(long elapsedMsgTime){ 
     sent.getAndIncrement(); 
     totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
    } 

    class CalcMetrics extends TimerTask { 
     @Override 
     public void run() { 
      sentLastSecond.set(sent.getAndSet(0)); 
      long tmpElapsed = totalElapsedMsgTime.getAndSet(0); 
      long tmpSent = sentLastSecond.longValue(); 

      if(tmpSent != 0) { 
       avgTimeLastSecond.set(tmpElapsed/tmpSent); 
      } else { 
       avgTimeLastSecond.set(0); 
      } 
     } 
    } 
} 

我的問題是,outTick函數將被調用上百次從許多不同的線程中的第二。 AtomicLong已經確保每個變量都是單獨的線程安全,並且它們不會在該函數中彼此交互,所以我不希望一個鎖會讓一個調用outTick阻塞另一個線程調用outTick。如果有幾個不同的線程遞增發送的變量,然後它們都添加到totalElapsedMsgTime變量中,那就很好了。

但是,一旦進入CalcMetrics運行方法(它只發生一次每秒一次),它們就會交互。我希望確保能夠在不進行outTick調用的情況下重置這兩個變量,或者在接收一個變量和下一個變量之間發生另一個outTick調用。

有沒有辦法做到這一點? (我的解釋是否有意義?)有沒有辦法說A不能與B交錯,但是多個B可以相互交錯?


編輯:

我去,詹姆斯建議ReadWriteLock中。以下是我對任何感興趣的人的結果:

public class Metrics { 
    AtomicLong numSent = new AtomicLong(); 
    AtomicLong totalElapsedMsgTime = new AtomicLong(); 

    long sentLastSecond = 0; 
    long avgTimeLastSecond = 0; 

    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 
    private final Lock readLock = readWriteLock.readLock(); 
    private final Lock writeLock = readWriteLock.writeLock(); 

    public void outTick(long elapsedMsgTime) { 
     readLock.lock(); 
     try { 
      numSent.getAndIncrement(); 
      totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
     } 
     finally 
     { 
      readLock.unlock(); 
     } 
    } 

    class CalcMetrics extends TimerTask { 

     @Override 
     public void run() { 
      long elapsed; 

      writeLock.lock(); 
      try { 
       sentLastSecond = numSent.getAndSet(0); 
       elapsed = totalElapsedMsgTime.getAndSet(0); 
      } 
      finally { 
       writeLock.unlock(); 
      } 

      if(sentLastSecond != 0) { 
       avgTimeLastSecond = (elapsed/sentLastSecond); 
      } else { 
       avgTimeLastSecond = 0; 
      } 
     } 
    } 
} 

回答

0

聽起來像你需要一個讀寫器鎖。 (java.util.concurrent.locks.ReentrantReadWriteLock)。

您的outTick()函數會鎖定ReaderLock。允許任意數量的線程同時鎖定ReaderLock。您的calcMetrics()會鎖定WriterLock。一旦一個線程等待寫作者鎖定,就不允許新的讀者進入,並且只有當所有的讀者都不在時,作者才被允許進入。

您仍然需要原子來保護單個計數器,這些計數器會增加outTick()

+0

謝謝。這看起來像我在找什麼。在我的特定用例中,「讀取」和「寫入」有點用詞不當,因爲我希望在更新它們時使用併發性(並因此使用ReaderLock),並在收集結果時使用專用WriterLock,但至少ReadWriteLock用相同的變量來解決排他性和併發區域兩方面的問題。 (我對這種語言仍然很陌生 - 非常需要學習。) – Lesley

+0

@Lesley,有時它也被稱爲「共享/排他」鎖。 –

0

使用鎖(https://docs.oracle.com/javase/tutorial/essential/concurrency/locksync.html)。一旦你實現了鎖定,你將擁有更好的控制權。另外一個副作用是你將不再需要使用AtomicLong(儘管你仍然可以);您可以使用易變長來代替,這會更有效率。我沒有在這個例子中做出改變。

基本上只是創建一個新的對象:

private Object lock = new Object(); 

然後,使用synchronized關鍵字與周圍的一切在同一時間與同一把鎖的另一個synchronized塊不應該發生的代碼的對象。例如:

synchronized(lock) 
{ 
    sent.getAndIncrement(); 
    totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
} 

所以,你的整個程序看起來就像這樣(注:未經測試的代碼)

public class Metrics { 
    private Object lock = new Object(); 

    AtomicLong sent = new AtomicLong(); 
    AtomicLong totalElapsedMsgTime = new AtomicLong(); 

    AtomicLong sentLastSecond = new AtomicLong(); 
    AtomicLong avgTimeLastSecond = new AtomicLong(); 

    public void outTick(long elapsedMsgTime){ 
     synchronized (lock) 
     { 
      sent.getAndIncrement(); 
      totalElapsedMsgTime.getAndAdd(elapsedMsgTime); 
     } 
    } 

    class CalcMetrics extends TimerTask { 
     @Override 
     public void run() { 
      synchronized (lock) 
      { 
       sentLastSecond.set(sent.getAndSet(0)); 
       long tmpElapsed = totalElapsedMsgTime.getAndSet(0); 
       long tmpSent = sentLastSecond.longValue(); 

       if(tmpSent != 0) { 
        avgTimeLastSecond.set(tmpElapsed/tmpSent); 
       } else { 
        avgTimeLastSecond.set(0); 
       } 
      } 
     } 
    } 
} 

編輯:我扔一起快速(醜)效率測試程序,並發現,當我與鎖同步,我得到整體更好的性能。請注意,由於Java JIT尚未將所有代碼路徑編譯爲機器碼時的計時結果不代表長期運行時間,所以前2次運行的結果將被丟棄。

結果:

  • 隨着鎖:8365ms
  • 的AtomicLong:21254ms

代碼:

import java.util.concurrent.atomic.AtomicLong; 

public class Main 
{ 
    private AtomicLong testA_1 = new AtomicLong(); 
    private AtomicLong testB_1 = new AtomicLong(); 

    private volatile long testA_2 = 0; 
    private volatile long testB_2 = 0; 

    private Object lock = new Object(); 

    private volatile boolean a = false; 
    private volatile boolean b = false; 
    private volatile boolean c = false; 

    private static boolean useLocks = false; 

    public static void main(String args[]) 
    { 
     System.out.println("Locks:"); 
     useLocks = true; 
     test(); 

     System.out.println("No Locks:"); 
     useLocks = false; 
     test(); 

     System.out.println("Locks:"); 
     useLocks = true; 
     test(); 

     System.out.println("No Locks:"); 
     useLocks = false; 
     test(); 
    } 

    private static void test() 
    { 
     final Main main = new Main(); 

     new Thread() 
     { 
      public void run() 
      { 
       for (int i = 0; i < 80000000; ++i) 
        main.outTick(10); 

       main.a = true; 
      } 
     }.start(); 

     new Thread() 
     { 
      public void run() 
      { 
       for (int i = 0; i < 80000000; ++i) 
        main.outTick(10); 

       main.b = true; 
      } 
     }.start(); 

     new Thread() 
     { 
      public void run() 
      { 
       for (int i = 0; i < 80000000; ++i) 
        main.outTick(10); 

       main.c = true; 
      } 
     }.start(); 

     long startTime = System.currentTimeMillis(); 

     // Okay this isn't the best way to do this, but it's good enough 
     while (!main.a || !main.b || !main.c) 
     { 
      try 
      { 
       Thread.sleep(1); 
      } catch (InterruptedException e) 
      { 
      } 
     } 

     System.out.println("Elapsed time: " + (System.currentTimeMillis() - startTime) + "ms"); 
     System.out.println("Test A: " + main.testA_1 + " " + main.testA_2); 
     System.out.println("Test B: " + main.testB_1 + " " + main.testB_2); 
     System.out.println(); 
    } 

    public void outTick(long elapsedMsgTime) 
    { 
     if (!useLocks) 
     { 
      testA_1.getAndIncrement(); 
      testB_1.getAndAdd(elapsedMsgTime); 
     } 
     else 
     { 
      synchronized (lock) 
      { 
       ++testA_2; 
       testB_2 += elapsedMsgTime; 
      } 
     } 
    } 
} 
+0

如果我不能得到我想要的工作,類似這樣的東西將是我的備用解決方案。我的問題是,它阻止超過我想阻止。 (多次調用outTick會阻塞_each other_out,這將比每秒一次的定時器更頻繁地發生) 然後,也許能夠將AtomicLongs更改爲volatile變量將獲得與額外的阻塞一樣多的效率成本。我不確定那一部分。 – Lesley

+0

他們已經互相阻攔了。 AtomicLong只是爲你阻塞,所以你沒有看到它,但效率將是相同的。更不用說對原始類型進行操作比調用方法效率更高。只要*確保他們是易變的*。 –

+0

@Lesley看到我的代碼展示了更好的鎖定效率。實際上,使用AtomicLong的鎖(而不是volatile,long,作爲示例代碼)也證明比沒有鎖的AtomicLong更有效,因爲AtomicLong的暴力衝突解決方法比Java鎖更加低效。 –

1

通常的解決方案是包裝的所有變量如一個原子數據類型。

class Data 
{ 
    long v1, v2; 

    Data add(Data another){ ... } 
} 

AtomicReference<Data> aData = ...; 

public void outTick(long elapsedMsgTime) 
{ 
    Data delta = new Data(1, elapsedMsgTime); 

    aData.accumulateAndGet(delta, Data:add); 
}  

就你而言,它可能不會比只是鎖定更快。

java8還有另一個有趣的鎖 - StampedLock。 javadoc示例與您的用例非常相似。基本上,你可以對多個變量進行樂觀的讀取;之後,檢查以確保在讀取期間沒有寫入。就你的情況而言,每秒寫入「數百」,樂觀的讀取大多會成功。