2013-03-06 84 views
1

我知道,我可以使用JMS和ActiveMQ的限制,但我真正需要的東西很簡單,沒有大量的開銷,可以將數據保存到硬盤驅動器。我用ActiveMQ做了一些測試,並且不太喜歡持久隊列的性能。是否有任何Java阻塞隊列,當達到

我正在尋找的是基本實現任何阻塞隊列與存儲在HDD(理想情況下),如果達到一定的大小限制信息的能力。然後,它應該能夠從HDD讀取存儲的消息,並且如果可能的話,停止向HDD寫入新的內容(在內存使用中恢復)。

我的方案很簡單 - 消息(JSON)從外界傳來。我做了一些處理,然後將它們發送到另一個REST服務。當目標REST服務關閉或者我們之間的網絡不好時,可能會發生問題。在這種情況下,準備好的事件將存儲在可能填滿所有可用內存的隊列中。我不希望/不需要將所有消息寫入HDD/DB - 只有那些不適合內存的消息。

謝謝!

+1

你所要求的不是'東西很simple'。你可能想要'可靠的東西'。 – 2013-03-06 17:52:35

+0

ehcache是​​我知道的透明地將數據移入或移出磁盤的最簡單的方法。如果隊列順序很重要,你需要自己處理。 – Affe 2013-03-06 18:01:45

+0

是的,隊列順序很重要。另外,當我說 - 「非常簡單的事情」時,我的意思是我不需要羣集企業解決方案(因爲我可以使用ActiveMQ)。所有的魔法都應該發生在1個JVM中。另外一個很好的功能 - 如果JVM停止 - 如果有任何消息,則從HDD填充隊列。 – Alex 2013-03-06 18:26:48

回答

0

此代碼應爲你工作 - 它在內存中的持久阻塞隊列的 - 需要一些文件的調優,但應工作

package test; 

    import java.io.BufferedReader; 
    import java.io.BufferedWriter; 
    import java.io.File; 
    import java.io.FileReader; 
    import java.io.FileWriter; 
    import java.io.IOException; 
    import java.util.ArrayList; 
    import java.util.Collections; 
    import java.util.LinkedList; 
    import java.util.List; 

    public class BlockingQueue { 

    //private static Long maxInMenorySize = 1L; 
    private static Long minFlushSize = 3L; 

    private static String baseDirectory = "/test/code/cache/"; 
    private static String fileNameFormat = "Table-"; 

    private static String currentWriteFile = ""; 

    private static List<Object> currentQueue = new LinkedList<Object>(); 
    private static List<Object> lastQueue = new LinkedList<Object>(); 

    static{ 
     try { 
      load(); 
     } catch (IOException e) { 
      System.out.println("Unable To Load"); 
      e.printStackTrace(); 
     } 
    } 

    private static void load() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()==0){ 
      //currentQueue = lastQueue = new ArrayList<Object>(); 
      currentWriteFile = baseDirectory + "Table-1"; 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString()+ "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 
     }else{ 
      if(fileList.size()>0){ 
        BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
        String line=null; 
        while ((line=reader.readLine())!=null){ 
         currentQueue.add(line); 
        } 
        reader.close(); 
        File toDelete = new File(fileList.get(0)); 
        toDelete.delete(); 
      } 

      if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(fileList.size()-1))); 
       currentWriteFile = fileList.get(fileList.size()-1); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        lastQueue.add(line); 
       } 
       reader.close(); 
       //lastFileNameIndex=Long.parseLong(fileList.get(fileList.size()).substring(6, 9)); 
      } 
     } 

    } 

    private void loadFirst() throws IOException{ 
     File baseLocation = new File(baseDirectory); 
     List<String> fileList = new ArrayList<String>(); 

     for(File entry : baseLocation.listFiles()){ 
      if(!entry.isDirectory() && entry.getName().contains(fileNameFormat)){ 
       fileList.add(entry.getAbsolutePath()); 
      } 
     } 

     Collections.sort(fileList); 

     if(fileList.size()>0){ 
       BufferedReader reader = new BufferedReader(new FileReader(fileList.get(0))); 
       String line=null; 
       while ((line=reader.readLine())!=null){ 
        currentQueue.add(line); 
       } 
       reader.close(); 
       File toDelete = new File(fileList.get(0)); 
       toDelete.delete(); 
     } 
    } 

    public Object pop(){ 
     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 

     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     if(currentQueue.size()>0) 
      return currentQueue.remove(0); 
     else 
      return null; 
    } 

    public synchronized Object waitTillPop() throws InterruptedException{ 
     if(currentQueue.size()==0){ 
      try { 
       loadFirst(); 
      } catch (IOException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      if(currentQueue.size()==0) 
       wait(); 
     } 
     return currentQueue.remove(0); 
    } 

    public synchronized void push(Object data) throws IOException{ 
     lastQueue.add(data); 
     this.notifyAll(); 
     if(lastQueue.size()>=minFlushSize){ 
      BufferedWriter writer = new BufferedWriter(new FileWriter(currentWriteFile)); 
      while (!lastQueue.isEmpty()){ 
       writer.write(lastQueue.get(0).toString() + "\n"); 
       lastQueue.remove(0); 
      } 
      writer.close(); 

      currentWriteFile = currentWriteFile.substring(0,currentWriteFile.indexOf("-")+1) + 
        (Integer.parseInt(currentWriteFile.substring(currentWriteFile.indexOf("-")+1,currentWriteFile.length())) + 1); 
     } 
    } 

    public static void main(String[] args) { 
     try { 
      BlockingQueue bq = new BlockingQueue(); 

      for(int i =0 ; i<=8 ; i++){ 
       bq.push(""+i); 
      } 

      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 
      System.out.println(bq.pop()); 

      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 
      System.out.println(bq.waitTillPop()); 



     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 


} 
+0

良好的嘗試,所以不會投票,但我沒有辦法在生產環境中相信這一點,因爲JVM可能會在事物持久化到磁盤之前崩潰。 – user924272 2017-10-02 23:08:49

0

好了,有你的隊列保存在磁盤上會工作,如果你回你的隊列一個RandomAccessFile,一個MemoryMappedFile或一個MappedByteBuffer ..或其他等效的實現。 如果您的JVM過早崩潰或終止,幾乎可以依靠您的操作系統將未提交的緩衝區保留到磁盤。需要注意的是,如果您的機器事先崩潰,您可以告別隊列中的任何更新,因此請確保您瞭解這一點。 儘管遭受嚴重的性能下降,但您仍可以同步磁盤以獲得有保證的持久性。 從更加核心的角度來看,另一種選擇是複製到另一臺機器以獲得冗餘,鑑於其複雜性,這需要單獨的答案。