2016-06-14 86 views
1

我查找一個隊列,該隊列在一定的時間(即10秒)內存儲最多N個元素,或者應該處理最早的值(如果已滿)。帶生存時間的消息隊列

我在Apache Collections(CircularFifoQueueJavaDoc)中發現了一個類似的隊列,它忽略了生存時間方面。一個完整的Fletched消息代理看起來像是我的小項目的開銷。

你介意給我一個提示,我應該如何實現這一點?當我輪詢元素時,我應該過濾掉舊的值嗎?

回答

1

我習慣了以下的隊列實現。代碼很大程度上基於Apaches CircularFifoQueue,並且只能進行弱測試。而且執行的是非線程安全不可序列化

如果您發現錯誤,請留下評論。

import java.util.AbstractCollection; 
import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.NoSuchElementException; 
import java.util.Queue; 
import java.util.concurrent.TimeUnit; 

/** 
* TimedQueue is a first-in first-out queue with a fixed size that 
* replaces its oldest element if full. 
* <p> 
* The removal order of a {@link TimedQueue} is based on the 
* insertion order; elements are removed in the same order in which they 
* were added. The iteration order is the same as the removal order. 
* <p> 
* The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll}, 
* {@link #offer(Object)} operations all perform in constant time. 
* All other operations perform in linear time or worse. 
* <p> 
* This queue prevents null objects from being added and it is not thread-safe and not serializable. 
* 
* The majority of this source code was copied from Apaches {@link CircularFifoQueue}: http://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/queue/CircularFifoQueue.html 
* 
* @version 1.0 
*/ 
public class TimedQueue<E> extends AbstractCollection<E> 
implements Queue<E> { 

/** Underlying storage array. */ 
private Item<E>[] elements; 

/** Array index of first (oldest) queue element. */ 
private int start = 0; 

/** 
* Index mod maxElements of the array position following the last queue 
* element. Queue elements start at elements[start] and "wrap around" 
* elements[maxElements-1], ending at elements[decrement(end)]. 
* For example, elements = {c,a,b}, start=1, end=1 corresponds to 
* the queue [a,b,c]. 
*/ 
private transient int end = 0; 

/** Flag to indicate if the queue is currently full. */ 
private transient boolean full = false; 

/** Capacity of the queue. */ 
private final int maxElements; 

private TimeUnit unit; 
private int delay; 

/** 
* Constructor that creates a queue with the default size of 32. 
*/ 
public TimedQueue() { 
    this(32); 
} 

/** 
* Constructor that creates a queue with the specified size. 
* 
* @param size the size of the queue (cannot be changed) 
* @throws IllegalArgumentException if the size is &lt; 1 
*/ 
public TimedQueue(final int size) { 
    this(size, 3, TimeUnit.SECONDS); 
} 

@SuppressWarnings("unchecked") 
public TimedQueue(final int size, int delay, TimeUnit unit) { 
    if (size <= 0) { 
     throw new IllegalArgumentException("The size must be greater than 0"); 
    } 
    elements = new Item[size]; 
    maxElements = elements.length; 
    this.unit = unit; 
    this.delay = delay; 
} 

/** 
* Constructor that creates a queue from the specified collection. 
* The collection size also sets the queue size. 
* 
* @param coll the collection to copy into the queue, may not be null 
* @throws NullPointerException if the collection is null 
*/ 
public TimedQueue(final Collection<? extends E> coll) { 
    this(coll.size()); 
    addAll(coll); 
} 

/** 
* Returns the number of elements stored in the queue. 
* 
* @return this queue's size 
*/ 
@Override 
public int size() { 
    int size = 0; 

    for(int i = 0; i < elements.length; i++) { 
     if(validElement(i) != null) { 
      size++; 
     } 
    } 

    return size; 
} 

/** 
* Returns true if this queue is empty; false otherwise. 
* 
* @return true if this queue is empty 
*/ 
@Override 
public boolean isEmpty() { 
    return size() == 0; 
} 

private boolean isAtFullCapacity() { 
    return size() == maxElements; 
} 

/** 
* Clears this queue. 
*/ 
@Override 
public void clear() { 
    full = false; 
    start = 0; 
    end = 0; 
    Arrays.fill(elements, null); 
} 

/** 
* Adds the given element to this queue. If the queue is full, the least recently added 
* element is discarded so that a new element can be inserted. 
* 
* @param element the element to add 
* @return true, always 
* @throws NullPointerException if the given element is null 
*/ 
@Override 
public boolean add(final E element) { 
    if (null == element) { 
     throw new NullPointerException("Attempted to add null object to queue"); 
    } 

    if (isAtFullCapacity()) { 
     remove(); 
    } 

    elements[end++] = new Item<E>(element); 

    if (end >= maxElements) { 
     end = 0; 
    } 

    if (end == start) { 
     full = true; 
    } 

    return true; 
} 

/** 
* Returns the element at the specified position in this queue. 
* 
* @param index the position of the element in the queue 
* @return the element at position {@code index} 
* @throws NoSuchElementException if the requested position is outside the range [0, size) 
*/ 
public E get(final int index) { 
    final int sz = size(); 
    if (sz == 0) { 
     throw new NoSuchElementException(
       String.format("The specified index (%1$d) is outside the available range because the queue is empty.", Integer.valueOf(index))); 
    } 
    if (index < 0 || index >= sz) { 
     throw new NoSuchElementException(
       String.format("The specified index (%1$d) is outside the available range [0, %2$d]", 
           Integer.valueOf(index), Integer.valueOf(sz-1))); 
    } 

    final int idx = (start + index) % maxElements; 
    return validElement(idx); 
} 

private E validElement(int idx) { 
    if(elements[idx] == null){ 
     return null; 
    } 
    long diff = System.currentTimeMillis() - elements[idx].getCreationTime(); 

    if(diff < unit.toMillis(delay)) { 
     return (E) elements[idx].getValue(); 
    } else { 
     elements[idx] = null; 
     return null; 
    } 
} 

//----------------------------------------------------------------------- 

/** 
* Adds the given element to this queue. If the queue is full, the least recently added 
* element is discarded so that a new element can be inserted. 
* 
* @param element the element to add 
* @return true, always 
* @throws NullPointerException if the given element is null 
*/ 
public boolean offer(E element) { 
    return add(element); 
} 

public E poll() { 
    if (isEmpty()) { 
     return null; 
    } 
    return remove(); 
} 

public E element() { 
    if (isEmpty()) { 
     throw new NoSuchElementException("queue is empty"); 
    } 
    return peek(); 
} 

public E peek() { 
    if (isEmpty()) { 
     return null; 
    } 
    return (E) elements[start].getValue(); 
} 

public E remove() { 
    if (isEmpty()) { 
     throw new NoSuchElementException("queue is empty"); 
    } 

    final E element = validElement(start); 
    if (null != element) { 
     elements[start++] = null; 

     if (start >= maxElements) { 
      start = 0; 
     } 
     full = false; 
    } 
    return element; 
} 

/** 
* Increments the internal index. 
* 
* @param index the index to increment 
* @return the updated index 
*/ 
private int increment(int index) { 
    index++; 
    if (index >= maxElements) { 
     index = 0; 
    } 
    return index; 
} 

/** 
* Decrements the internal index. 
* 
* @param index the index to decrement 
* @return the updated index 
*/ 
private int decrement(int index) { 
    index--; 
    if (index < 0) { 
     index = maxElements - 1; 
    } 
    return index; 
} 

/** 
* Returns an iterator over this queue's elements. 
* 
* @return an iterator over this queue's elements 
*/ 
@Override 
public Iterator<E> iterator() { 
    return new Iterator<E>() { 

     private int index = start; 
     private int lastReturnedIndex = -1; 
     private boolean isFirst = full; 

     public boolean hasNext() { 
      return (isFirst || index != end) && size() > 0; 
     } 

     public E next() { 
      if (!hasNext()) { 
       throw new NoSuchElementException(); 
      } 
      isFirst = false; 
      lastReturnedIndex = index; 
      index = increment(index); 
      if(validElement(lastReturnedIndex) == null) { 
       return next(); 
      } else { 
       return validElement(lastReturnedIndex); 
      } 
     } 

     public void remove() { 
      if (lastReturnedIndex == -1) { 
       throw new IllegalStateException(); 
      } 

      // First element can be removed quickly 
      if (lastReturnedIndex == start) { 
       TimedQueue.this.remove(); 
       lastReturnedIndex = -1; 
       return; 
      } 

      int pos = lastReturnedIndex + 1; 
      if (start < lastReturnedIndex && pos < end) { 
       // shift in one part 
       System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos); 
      } else { 
       // Other elements require us to shift the subsequent elements 
       while (pos != end) { 
        if (pos >= maxElements) { 
         elements[pos - 1] = elements[0]; 
         pos = 0; 
        } else { 
         elements[decrement(pos)] = elements[pos]; 
         pos = increment(pos); 
        } 
       } 
      } 

      lastReturnedIndex = -1; 
      end = decrement(end); 
      elements[end] = null; 
      full = false; 
      index = decrement(index); 
     } 

    }; 
} 

private static final class Item<E> { 
    private long creationTime; 
    private E in; 

    public Item(E in) { 
     this.in = in; 
     creationTime = System.currentTimeMillis(); 
    } 

    public E getValue() { 
     return in; 
    } 

    public long getCreationTime() { 
     return creationTime; 
    } 
} 
} 
1

有一個叫LinkedHashMap的類,它有一個特殊的方法來刪除陳舊的數據。從文檔:

保護布爾removeEldestEntry(Map.Entry的長子)
返回true,如果此映射移除其最舊的條目。

只要有任何事情被添加到列表(隊列),方法removeEldestEntry被調用。如果它返回true那麼最老的條目將被刪除以爲新條目騰出空間,否則不會刪除任何內容。您可以添加您自己的實現,檢查最長條目上的時間戳,並且如果它早於過期閾值(例如10秒),則返回true。因此,您的實現可能是這個樣子:

protected boolean removeEldestEntry(Map.Entry eldest) { 
    long currTimeMillis = System.currentTimeMillis(); 
    long entryTimeMillis = eldest.getValue().getTimeCreation(); 

    return (currTimeMillis - entryTimeMillis) > (1000*10*60); 
} 
+0

我會檢查並保持張貼! – Markus

+0

這個答案太棒了! –

+0

只有在添加新條目時才調用removeEldestEntry方法。舊的值仍保留在地圖中。 – Markus

1

我覺得java.util.LinkedHashMap是你的解決方案。它有一個removeEldest()方法,只要在地圖中放入一個條目就會調用它。您可以覆蓋它以指示是否應刪除最長的條目。

的JavaDoc中給出了一個很好的例子:

private static final int MAX_ENTRIES = 100; 

protected boolean removeEldestEntry(Map.Entry eldest) { 
    return size() > MAX_ENTRIES; 
} 

這將刪除舊的條目,如果地圖上有超過100個條目。

10秒後主動移除物品將需要單獨的線程來檢查年齡並移除舊物品。我猜這不是你想要的,根據你的描述來判斷。

+0

這個答案應該是一個評論。 –

+0

這看起來不錯。但是,我想刪除超過10秒的項目。是否已有這種「基於時間的隊列」的實現? – Markus