2012-02-13 108 views
7

在我的一個Java 6應用程序中,我有一個線程向數據主線程提供數據,同時還從數據庫中預取更多記錄。它採用ArrayBlockingQueue隊列爲FIFO緩衝區,主循環是沿着這些路線的東西:當線程中斷時,BlockingQueue方法是否總是拋出InterruptedException?

while (!Thread.interrupted()) { 
    if (source.hasNext()) { 
     try { 
      queue.put(source.next()) 
     } catch (InterruptedException e) { 
      break; 
     } 
    } else { 
     break; 
    } 
} 

有代碼,不會在循環後一點的清理結束,如中毒隊列並釋放任何資源,但這幾乎都是關於它的。

就目前而言,主線程沒有直接通信:饋線線程:饋線線程被設置了適當的選項,然後自己離開,使用阻塞隊列來控制數據流。

當隊列滿時主線程需要關閉饋線時出現問題。由於沒有直接的控制通道,因此關閉方法使用接口接口interrupt()饋線線程。不幸的是,在大多數情況下,饋線線程在put()仍然被阻止,儘管被中斷 - 沒有任何例外。

通過對interrupt()文檔和隊列實現源代碼的簡要介紹,在我看來,經常有put()塊沒有使用任何JVM的可中斷設施。更具體地說,在我當前的JVM(OpenJDK 1.6b22)上,它阻塞了sun.misc.Unsafe.park()本地方法。也許它使用自旋鎖或其他東西,但無論如何,這似乎落在the following case

如果以前的任何條件都不成立,則會設置此線程的中斷狀態。

一個狀態標誌設置,但線程仍然受阻於put(),並且不進一步遍歷而標誌進行檢查。結果?一個殭屍線程只是不會死

  1. 我對這個問題的理解是正確的,還是我錯過了什麼?

  2. 解決此問題的可能方法是什麼?現在我只能想到兩種解決方案:

    a。調用poll()一堆隊列上的疏通進紙器線程:醜陋和不是很可靠,從我所見過的,但它主要是作品。

    b。使用offer()方法用超時而不是put()來允許線程在可接受的時間範圍內檢查其中斷狀態。

除非我遺漏了一些東西,否則這是對Java中BlockingQueue實現的一個有點沒有記錄的警告。有似乎是它的一些跡象,當文件例如建議中毒隊列以關閉工作線程,但我找不到任何明確的參考。

編輯:

行,還有一個更,恩,急劇的上述溶液(a)的變化:ArrayBlockingQueue.clear()。我認爲這應該始終工作,即使它不完全是優雅的定義...

回答

7

我認爲有兩個可能的原因到您的問題。

  1. The Law of the Sabotaged Doorbell中所述,您可能無法正確處理中斷。您會發現:

    當我們調用可能導致InterruptedException的代碼時,應該怎麼辦?不要立即拔出電池!通常,該問題有兩個答案:

    從您的方法重新引發InterruptedException。這通常是最簡單和最好的方法。它由新的java.util.concurrent。*包使用,這解釋了爲什麼我們現在不斷接觸到這個異常。
    抓住它,設置中斷狀態,返回。如果您正在調用可能導致異常的代碼的循環中運行,則應該將狀態重置爲中斷狀態。

    例如:

    while (!Thread.currentThread().isInterrupted()) { 
        // do something 
        try { 
         TimeUnit.SECONDS.sleep(1000); 
        } catch (InterruptedException e) { 
         Thread.currentThread().interrupt(); 
         break; 
        } 
    } 
    
  2. 要麼source.hasNext()source.next()正在消耗並丟棄的中斷狀態。請參閱在下面添加以瞭解我如何解決此問題。

我相信,在ArrayBlockingqueue.put()中斷線程的有效解決方案。

新增

我使用CloseableBlockingQueue這可以從閱讀器端被關閉解決的問題2。這樣,一旦關閉,所有put調用將會快捷。然後您可以檢查作者隊列中的closed標誌。

// A blocking queue I can close from the pull end. 
// Please only use put because offer does not shortcut on close. 
// <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin."> 
class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> { 
    // Flag indicates closed state. 
    private volatile boolean closed = false; 
    // All blocked threads. Actually this is all threads that are in the process 
    // of invoking a put but if put doesn't block then they disappear pretty fast. 
    // NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put. 
    private final Container<Thread> blocked; 

    // Limited size. 
    public CloseableBlockingQueue(int queueLength) { 
    super(queueLength); 
    blocked = new Container<Thread>(queueLength); 
    } 

    /** 
    * * 
    * Shortcut to do nothing if closed. 
    * 
    * Track blocked threads. 
    */ 
    @Override 
    public void put(E e) throws InterruptedException { 
    if (!closed) { 
     Thread t = Thread.currentThread(); 
     // Hold my node on the stack so removal can be trivial. 
     Container.Node<Thread> n = blocked.add(t); 
     try { 
     super.put(e); 
     } finally { 
     // Not blocked anymore. 
     blocked.remove(n, t); 
     } 
    } 
    } 

    /** 
    * 
    * Shortcut to do nothing if closed. 
    */ 
    @Override 
    public E poll() { 
    E it = null; 
    // Do nothing when closed. 
    if (!closed) { 
     it = super.poll(); 
    } 
    return it; 
    } 

    /** 
    * 
    * Shortcut to do nothing if closed. 
    */ 
    @Override 
    public E poll(long l, TimeUnit tu) throws InterruptedException { 
    E it = null; 
    // Do nothing when closed. 
    if (!closed) { 
     it = super.poll(l, tu); 
    } 
    return it; 
    } 

    /** 
    * 
    * isClosed 
    */ 
    boolean isClosed() { 
    return closed; 
    } 

    /** 
    * 
    * Close down everything. 
    */ 
    void close() { 
    // Stop all new queue entries. 
    closed = true; 
    // Must unblock all blocked threads. 

    // Walk all blocked threads and interrupt them. 
    for (Thread t : blocked) { 
     //log("! Interrupting " + t.toString()); 
     // Interrupt all of them. 
     t.interrupt(); 
    } 
    } 

    @Override 
    public String toString() { 
    return blocked.toString(); 
    } 
} 

您還需要Container這是無鎖和O(1)put/get(儘管它不是嚴格意義上的集合)。它在幕後使用了Ring

public class Container<T> implements Iterable<T> { 

    // The capacity of the container. 
    final int capacity; 
    // The list. 
    AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); 

    // Constructor 
    public Container(int capacity) { 
    this.capacity = capacity; 
    // Construct the list. 
    Node<T> h = new Node<T>(); 
    Node<T> it = h; 
    // One created, now add (capacity - 1) more 
    for (int i = 0; i < capacity - 1; i++) { 
     // Add it. 
     it.next = new Node<T>(); 
     // Step on to it. 
     it = it.next; 
    } 
    // Make it a ring. 
    it.next = h; 
    // Install it. 
    head.set(h); 
    } 

    // Empty ... NOT thread safe. 
    public void clear() { 
    Node<T> it = head.get(); 
    for (int i = 0; i < capacity; i++) { 
     // Trash the element 
     it.element = null; 
     // Mark it free. 
     it.free.set(true); 
     it = it.next; 
    } 
    // Clear stats. 
    resetStats(); 
    } 

    // Add a new one. 
    public Node<T> add(T element) { 
    // Get a free node and attach the element. 
    return getFree().attach(element); 
    } 

    // Find the next free element and mark it not free. 
    private Node<T> getFree() { 
    Node<T> freeNode = head.get(); 
    int skipped = 0; 
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free. 
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { 
     skipped += 1; 
     freeNode = freeNode.next; 
    } 
    if (skipped < capacity) { 
     // Put the head as next. 
     // Doesn't matter if it fails. That would just mean someone else was doing the same. 
     head.set(freeNode.next); 
    } else { 
     // We hit the end! No more free nodes. 
     throw new IllegalStateException("Capacity exhausted."); 
    } 
    return freeNode; 
    } 

    // Mark it free. 
    public void remove(Node<T> it, T element) { 
    // Remove the element first. 
    it.detach(element); 
    // Mark it as free. 
    if (!it.free.compareAndSet(false, true)) { 
     throw new IllegalStateException("Freeing a freed node."); 
    } 
    } 

    // The Node class. It is static so needs the <T> repeated. 
    public static class Node<T> { 

    // The element in the node. 
    private T element; 
    // Are we free? 
    private AtomicBoolean free = new AtomicBoolean(true); 
    // The next reference in whatever list I am in. 
    private Node<T> next; 

    // Construct a node of the list 
    private Node() { 
     // Start empty. 
     element = null; 
    } 

    // Attach the element. 
    public Node<T> attach(T element) { 
     // Sanity check. 
     if (this.element == null) { 
     this.element = element; 
     } else { 
     throw new IllegalArgumentException("There is already an element attached."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    // Detach the element. 
    public Node<T> detach(T element) { 
     // Sanity check. 
     if (this.element == element) { 
     this.element = null; 
     } else { 
     throw new IllegalArgumentException("Removal of wrong element."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    @Override 
    public String toString() { 
     return element != null ? element.toString() : "null"; 
    } 
    } 

    // Provides an iterator across all items in the container. 
    public Iterator<T> iterator() { 
    return new UsedNodesIterator<T>(this); 
    } 

    // Iterates across used nodes. 
    private static class UsedNodesIterator<T> implements Iterator<T> { 
    // Where next to look for the next used node. 

    Node<T> it; 
    int limit = 0; 
    T next = null; 

    public UsedNodesIterator(Container<T> c) { 
     // Snapshot the head node at this time. 
     it = c.head.get(); 
     limit = c.capacity; 
    } 

    public boolean hasNext() { 
     if (next == null) { 
     // Scan to the next non-free node. 
     while (limit > 0 && it.free.get() == true) { 
      it = it.next; 
      // Step down 1. 
      limit -= 1; 
     } 
     if (limit != 0) { 
      next = it.element; 
     } 
     } 
     return next != null; 
    } 

    public T next() { 
     T n = null; 
     if (hasNext()) { 
     // Give it to them. 
     n = next; 
     next = null; 
     // Step forward. 
     it = it.next; 
     limit -= 1; 
     } else { 
     // Not there!! 
     throw new NoSuchElementException(); 
     } 
     return n; 
    } 

    public void remove() { 
     throw new UnsupportedOperationException("Not supported."); 
    } 
    } 

    @Override 
    public String toString() { 
    StringBuilder s = new StringBuilder(); 
    Separator comma = new Separator(","); 
    // Keep counts too. 
    int usedCount = 0; 
    int freeCount = 0; 
    // I will iterate the list myself as I want to count free nodes too. 
    Node<T> it = head.get(); 
    int count = 0; 
    s.append("["); 
    // Scan to the end. 
    while (count < capacity) { 
     // Is it in-use? 
     if (it.free.get() == false) { 
     // Grab its element. 
     T e = it.element; 
     // Is it null? 
     if (e != null) { 
      // Good element. 
      s.append(comma.sep()).append(e.toString()); 
      // Count them. 
      usedCount += 1; 
     } else { 
      // Probably became free while I was traversing. 
      // Because the element is detached before the entry is marked free. 
      freeCount += 1; 
     } 
     } else { 
     // Free one. 
     freeCount += 1; 
     } 
     // Next 
     it = it.next; 
     count += 1; 
    } 
    // Decorate with counts "]used+free". 
    s.append("]").append(usedCount).append("+").append(freeCount); 
    if (usedCount + freeCount != capacity) { 
     // Perhaps something was added/freed while we were iterating. 
     s.append("?"); 
    } 
    return s.toString(); 
    } 
} 
+0

我輕輕地拋棄了這種可能性,因爲饋線線程花費更多的時間在等待'把()'。然而這聽起來似乎合理。 'source'對象屬於第三方數據庫相關的庫 - 所有那些網絡代碼,*有*是一個拋出的InterruptedException,但頂級方法不會拋出它們......嘆氣,我*討厭*挖掘第三方代碼... – thkala 2012-02-13 01:09:54

+0

哦,大聲哭泣......誰寫這個庫,它吞下每一個拋出的InterruptedException!每一個!誰寫這樣的代碼? – thkala 2012-02-13 01:22:22

+0

順便說一下,將線程狀態設置爲中斷,只要該特定線程處於關閉狀態,就不會改變任何內容 - 一旦環路將線路直接切斷到終止狀態。我沒有得到任何例外處理,無論如何... – thkala 2012-02-13 01:30:42

1
 
private AtomicBoolean shutdown = new AtomicBoolean(); 

void shutdown() 
{ 
    shutdown.set(true); 
} 

while (!shutdown.get()) { 
    if (source.hasNext()) { 
     Object item = source.next(); 
     while (!shutdown.get() && !queue.offer(item, 100, TimeUnit.MILLISECONDS)) { 
      continue; 
     } 
    } 
    else { 
     break; 
    } 
} 
+0

1.我的代碼中的'Thread.interrupted()'調用應該無關緊要 - 當它返回時,線程正在進入終止狀態。 – thkala 2012-02-13 02:12:42

+0

2.好,趕上!簡化我的StackOverflow帖子的代碼時,我錯過了一個'break'語句。我已將其編輯回問題。 – thkala 2012-02-13 02:13:39

+0

3. I * do *想要在主線程終止之前很長時間停止饋線線程。不僅它不再有用,而且它消耗了大量資源 - 至少有一個數據庫連接和一個引用來阻止GC清除隊列。 – thkala 2012-02-13 02:15:33

相關問題