有人知道開源的BufferedIterator,其中後面的N個元素是在後臺線程上熱切地獲取的嗎? 這是an implementation從TechRepublic article,但我認爲它還沒有經過徹底測試。BufferedIterator的實現
Iterators .buffer(Iterator toBuffer,int bufferSize)對Guava來說是一個很好的補充,有沒有考慮過?
有人知道開源的BufferedIterator,其中後面的N個元素是在後臺線程上熱切地獲取的嗎? 這是an implementation從TechRepublic article,但我認爲它還沒有經過徹底測試。BufferedIterator的實現
Iterators .buffer(Iterator toBuffer,int bufferSize)對Guava來說是一個很好的補充,有沒有考慮過?
出現鏈接的實施已經寫的Java 4,可以使用番石榴和java.util.concurrent
被簡化了一點:
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.Executors;
public abstract class Iterators2 {
public static <E> Iterator<E> buffer(final Iterator<E> source,
int capacity) {
return buffer(source, capacity, defaultExecutor);
}
public static <E> Iterator<E> buffer(final Iterator<E> source,
int capacity,
final ExecutorService exec) {
if (capacity <= 0) return source;
final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity);
// Temporary storage for an element we fetched but could not fit in the queue
final AtomicReference<E> overflow = new AtomicReference<E>();
final Runnable inserter = new Runnable() {
@SuppressWarnings("unchecked")
public void run() {
E next = (E) END_MARKER;
if (source.hasNext()) {
next = source.next();
// ArrayBlockingQueue does not allow nulls
if (next == null) next = (E) NULL_MARKER;
}
if (queue.offer(next)) {
// Keep buffering elements as long as we can
if (next != END_MARKER) exec.submit(this);
} else {
// Save the element. This also signals to the
// iterator that the inserter thread is blocked.
overflow.lazySet(next);
}
}
};
// Fetch the first element.
// The inserter will resubmit itself as necessary to fetch more elements.
exec.submit(inserter);
Iterator<E> iterator = new AbstractIterator<E>() {
protected E computeNext() {
try {
E next = queue.take();
E overflowElem = overflow.getAndSet(null);
if (overflowElem != null) {
// There is now a space in the queue
queue.put(overflowElem);
// Awaken the inserter thread
exec.submit(inserter);
}
if (next == END_MARKER) {
return endOfData();
} else if (next == NULL_MARKER) {
return null;
} else {
return next;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return endOfData();
}
}
};
return iterator;
}
protected Iterators2() {
throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated"));
}
private static ExecutorService defaultExecutor =
java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory());
private static final Object END_MARKER = new Object();
private static final Object NULL_MARKER = new Object();
}
注:以上執行不嘗試處理源異常迭代器(如果拋出一個,插入器任務將突然終止,導致調用線程死鎖。)
爲什麼你會接受一個ExecutorService,你什麼時候想要一個守護線程工廠以外的東西,就像你默認做的那樣? – 2010-01-28 01:49:04
您可能想要從固定大小的池中分配線程。或者您可能想要修改默認優先級。或者您可能想要跟蹤所有線程,以防在數據庫連接斷開時需要殺死它們。使用現有接口('ExecutorService'和'ThreadFactory')更簡單,而不是爲所有不同的選項添加一堆重載。 – finnw 2010-01-28 02:14:01
請注意,此實現不會適當地處理來自源迭代器的異常。 – 2010-02-01 19:19:20
這可能是一個合理的番石榴特徵請求。 http://code.google.com/p/guava-libraries/issues/entry – 2010-01-28 16:43:52
完成:http://code.google.com/p/guava-libraries/issues/detail?id=318 – 2010-01-28 18:54:48