2011-12-21 80 views
6

在線程間通信時遇到問題,並通過在整個地方使用「虛擬消息」來「解決」它。這是一個壞主意嗎?有什麼解決方法?生產者 - 消費者線程間通信

示例我有問題。

主線程啓動一個處理線程並將記錄插入到數據庫中。 主線程讀取一個可能很大的文件,並將一個記錄(對象)放入另一個記錄(對象)中。處理線程從隊列中讀取並確實工作。

如何告訴「處理線程」停止? 隊列可以是空的,但工作沒有完成,並且主線程現在不會在處理線程完成工作並且不能中斷它時。

所以處理線程做

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) { 
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS); 
    if (object != null) { 
     String data = object.getData(); 
     if (data.equals("END")) { 
      break; 
     } 
    // do work 
    } 
} 
// clean-up 
synchronized queue) { 
    queue.notifyAll(); 
} 
return; 

和主線程

// ...start processing thread... 
while(reader.hasNext(){ 
    // ...read whole file and put data in queue... 
} 
MyObject dummy = new MyObject(); 
dummy.setData("END"); 
queue.put(dummy); 
//Note: empty queue here means work is done 
while (queue.size() > 0) { 
    synchronized (queue) { 
     queue.wait(500); // over-cautios locking prevention i guess 
    } 
} 

注意,插入必須在同一個事務和事務不能由主線程來處理 。

這樣做會更好嗎? (我正在學習,不想以「錯誤的方式開始」)

回答

4

這些虛擬消息是有效的。它被稱爲「毒藥」。生產者發送給消費者的東西使其停止。

其他可能性是在主線程的某處調用Thread.interrupt(),並在工作線程中相應地捕獲並處理InterruptedException。

+0

好的。如果傳遞的對象的類型沒有可以輕易用於此的String字段,我將如何使用它?如果任何文本可能是有效的數據,因此可能等於毒藥?使用GUID? – 2011-12-22 14:01:28

+1

理想情況下,您可以更改MyObject類並在其中添加isPoison()方法。這種方法如何確定對象是否是毒藥取決於你,可能是一個簡單的事情,比如比較消息是否爲空,或者消息是否明確創建爲毒藥。一個想法是將MyObject重構爲兩個實現的接口,一個始終在isPoison()方法中返回false的MessageObject和一個始終返回true的PoisonMessage。但是,這很大程度上取決於你在做什麼,根據你的真實情況可能會有更好的解決方案。 – 2011-12-22 14:08:54

2

通過在各處使用「虛擬消息」「解決」它。這是一個 壞主意嗎?有什麼解決方法?

這不是一個壞主意,它被稱爲「毒丸」,是停止線程服務的合理方法。

但它只適用於知道生產者和消費者的數量。

在你發佈的代碼中,有兩個線程,一個是「主線程」,它產生數據,另一個是「處理線程」,它消耗數據,「毒丸」在這種情況下工作良好。但想象一下,如果您還有其他生產者,消費者如何知道何時停止(只有當所有生產者發送「毒丸」時),您都需要準確瞭解所有生產者的數量,並檢查消費者中的「毒丸」數量,如果它等於生產者的數量,這意味着所有的生產者停止工作,那麼消費者就會停止。

在「主線程」中,您需要找到InterruptedException,否則「主線程」可能無法設置「毒丸」。你可以不喜歡它的下方,

... 
try { 
    // do normal processing 
} catch (InterruptedException e) { /* fall through */ } 
finally { 
    MyObject dummy = new MyObject(); 
    dummy.setData("END"); 
    ... 
} 
... 

此外,您還可以嘗試使用ExecutorService爲您解決所有的問題。

(它的工作原理,當你只是需要做一些工作,然後停止,當所有人都完成)

void doWorks(Set<String> works, long timeout, TimeUnit unit) 
    throws InterruptedException { 
    ExecutorService exec = Executors.newCachedThreadPool(); 
    try { 
     for (final String work : works) 
      exec.execute(new Runnable() { 
        public void run() { 
         ... 
        } 
       }); 
    } finally { 
     exec.shutdown(); 
     exec.awaitTermination(timeout, unit); 
    } 
} 

我學習,不希望啓動「做了錯誤的方式」

您可能需要閱讀Book:Java Concurrency in Practice。相信我,這是最好的。

+0

我知道執行者服務,但讀取數據和處理(驗證)和插入不能在同一個任務或方法中,因爲它們都應該在同一個數據庫事務中運行。如果你明白我的意思。 – 2011-12-22 14:07:16

+0

好吧,我明白了,'ExecutorService'只是我的一般建議,根據您的要求使用它或不使用。但要注意捕捉所有例外情況,如果某些例外未被捕獲,您可能會失去設置「毒丸」的機會。 – 2011-12-22 14:24:22

0

你可以做什麼(我在最近的一個項目中做的)是包裝隊列,然後添加一個'isOpen()'方法。

class ClosableQ<T> { 

    boolean isOpen = true; 

    private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); 

    public void put(T someObject) { 
     if (isOpen) { 
     lbq.put(someObject); 
     } 
    } 

    public T get() { 
     if (isOpen) { 
     return lbq.get(0); 
     } 
    } 

    public boolean isOpen() { 
     return isOpen; 
    } 

    public void open() { 
     isOpen = true; 
    } 

    public void close() { 
     isOpen = false; 
    } 
} 

所以你寫線程變成類似:

while (reader.hasNext()) { 
    // read the file and put it into the queue 
    dataQ.put(someObject); 
} 
// now we're done 
dataQ.close(); 

和讀取器線程:

while (dataQ.isOpen) { 
    someObject = dataQ.get(); 
} 

你當然可以擴展列表,而不是而是爲用戶提供了一個水平訪問你可能不想要的。你需要爲這個代碼添加一些併發的東西,比如AtomicBoolean。

+0

我將Queue傳遞給一個方法。讀你的想法讓我有以下幾點。該方法也可以接受AtomicBoolean keepProcessing。然後它可以運行,只要queue.size> 0 || keepProcessing.get() – 2011-12-22 15:07:31