2010-01-27 79 views
7

有人知道開源的BufferedIterator,其中後面的N個元素是在後臺線程上熱切地獲取的嗎? 這是an implementationTechRepublic article,但我認爲它還沒有經過徹底測試。BufferedIterator的實現

Iterators .buffer(Iterator toBuffer,int bufferSize)對Guava來說是一個很好的補充,有沒有考慮過?

+0

這可能是一個合理的番石榴特徵請求。 http://code.google.com/p/guava-libraries/issues/entry – 2010-01-28 16:43:52

+0

完成:http://code.google.com/p/guava-libraries/issues/detail?id=318 – 2010-01-28 18:54:48

回答

4

出現鏈接的實施已經寫的Java 4,可以使用番石榴和java.util.concurrent被簡化了一點:

import java.util.Iterator; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.atomic.AtomicReference; 

import com.google.common.base.Throwables; 
import com.google.common.collect.AbstractIterator; 
import com.google.common.util.concurrent.Executors; 

public abstract class Iterators2 { 
    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity) { 
     return buffer(source, capacity, defaultExecutor); 
    } 

    public static <E> Iterator<E> buffer(final Iterator<E>  source, 
             int     capacity, 
             final ExecutorService exec) { 
     if (capacity <= 0) return source; 
     final BlockingQueue<E> queue = new ArrayBlockingQueue<E>(capacity); 

     // Temporary storage for an element we fetched but could not fit in the queue 
     final AtomicReference<E> overflow = new AtomicReference<E>(); 
     final Runnable inserter = new Runnable() { 
      @SuppressWarnings("unchecked") 
      public void run() { 
       E next = (E) END_MARKER; 
       if (source.hasNext()) { 
        next = source.next(); 
        // ArrayBlockingQueue does not allow nulls 
        if (next == null) next = (E) NULL_MARKER; 
       } 
       if (queue.offer(next)) { 
        // Keep buffering elements as long as we can 
        if (next != END_MARKER) exec.submit(this); 
       } else { 
        // Save the element. This also signals to the 
        // iterator that the inserter thread is blocked. 
        overflow.lazySet(next); 
       } 
      } 
     }; 
     // Fetch the first element. 
     // The inserter will resubmit itself as necessary to fetch more elements. 
     exec.submit(inserter); 
     Iterator<E> iterator = new AbstractIterator<E>() { 
      protected E computeNext() { 
       try { 
        E next = queue.take(); 
        E overflowElem = overflow.getAndSet(null); 
        if (overflowElem != null) { 
         // There is now a space in the queue 
         queue.put(overflowElem); 
         // Awaken the inserter thread 
         exec.submit(inserter); 
        } 
        if (next == END_MARKER) { 
         return endOfData(); 
        } else if (next == NULL_MARKER) { 
         return null; 
        } else { 
         return next; 
        } 
       } catch (InterruptedException ex) { 
        Thread.currentThread().interrupt(); 
        return endOfData(); 
       } 
      } 
     }; 

     return iterator; 
    } 

    protected Iterators2() { 
     throw Throwables.propagate(new InstantiationException(Iterators2.class + " is a static class and cannot be instantiated")); 
    } 

    private static ExecutorService defaultExecutor = 
     java.util.concurrent.Executors.newCachedThreadPool(Executors.daemonThreadFactory()); 

    private static final Object END_MARKER = new Object(); 

    private static final Object NULL_MARKER = new Object(); 
} 

注:以上執行不嘗試處理源異常迭代器(如果拋出一個,插入器任務將突然終止,導致調用線程死鎖。)

+0

爲什麼你會接受一個ExecutorService,你什麼時候想要一個守護線程工廠以外的東西,就像你默認做的那樣? – 2010-01-28 01:49:04

+2

您可能想要從固定大小的池中分配線程。或者您可能想要修改默認優先級。或者您可能想要跟蹤所有線程,以防在數據庫連接斷開時需要殺死它們。使用現有接口('ExecutorService'和'ThreadFactory')更簡單,而不是爲所有不同的選項添加一堆重載。 – finnw 2010-01-28 02:14:01

+1

請注意,此實現不會適當地處理來自源迭代器的異常。 – 2010-02-01 19:19:20