0

我寫了一個在生產者 - 消費者模型中使用SynchronousQueue的測試示例。但它運作不好。以下是我的代碼:如何在生產者 - 消費者模型中適當地使用SynchronousQueue?

public class QueueTest { 

    String input; 
    int pos; 
    BlockingQueue<String> queue; 
    volatile boolean exitFlag; 

    QueueTest() 
    { 
     for(int i=0; i<10000; i++) 
      input += "abcde"; 
     input += "X"; 
     pos = 0; 
     queue = new SynchronousQueue<String>(); 
     exitFlag = false; 
    } 

    public static void main(String[] args) { 
     QueueTest qtest = new QueueTest(); 
     qtest.runTest(); 
    } 

    void runTest() 
    { 
     Thread producer = new Thread(new Producer()); 
     Thread consumer = new Thread(new Consumer()); 
     producer.start(); 
     consumer.start(); 
     try { 
      producer.join(); 
      consumer.join(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    class Producer implements Runnable 
    { 
     public void run() 
     { 
      while(true) 
      { 
       String s = read(); 
       if(s.equals("X")) 
        break; 
       try { 
        queue.put(s); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
      exitFlag = true; 
     } 
    } 

    class Consumer implements Runnable 
    { 
     public void run() 
     { 
      while(exitFlag == false) 
      { 
       String s = null; 
       try { 
        s = queue.take(); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       process(s); 
      } 
     } 
    } 

    String read() 
    { 
     String str = input.substring(pos, pos+1); 
     pos++; 
     return str; 
    } 

    void process(String s) 
    { 
     long sum = 0; 
     for(long i=0; i<1000; i++) 
      sum = sum * i + i; 
    } 
} 

問題是運行被卡住就像死鎖。這些簡單代碼中是否存在任何錯誤?

回答

1

您更有可能看到競賽狀況。想象一下場景

Thread 1 put into queue 
Thread 2 takes out of queue quickly processes and awaits another put from thread 1 
Thread 1 finishes and sets exitFlag to true 

在這種情況下,線程2將永久坐,因爲exitFlag不是之前從中線程2讀取設置爲false。

你可能要考慮一個毒丸。這是我們完成的另一個線程的消息。例如:

final String POISON_PILL = " POISON"; 

class Producer implements Runnable { 
    public void run() { 
     while (true) { 
      String s = read(); 
      if (s.equals("X")) 
       break; 
      try { 
       queue.put(s); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     try { 
      queue.put(POISON_PILL); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

class Consumer implements Runnable { 
    public void run() { 
     String s = null; 
     try { 
      while ((s = queue.take()) != POISON_PILL) { 
       process(s); 
      } 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

所以當另一個線程通知其他線程已經完成時,兩個線程都應該優雅地結束。

+0

正試圖幫助OP實現解決方案...... – jtahlborn 2012-08-14 19:23:26

+0

這真是個好主意。我現在會嘗試。 – JackWM 2012-08-14 19:29:39

+0

它的工作原理。非常感謝!一個很好的教訓。 – JackWM 2012-08-14 19:33:30

0

由於您的exitFlag是在多個線程之間共享的,您必須執行一些操作以使Producer可以對消費者進行更新(根據Java內存模型)。在這個例子中,讓價值變化就足夠了。

更新:

您應該爲您的掛起代碼生成堆棧轉儲。這會給你一個線索,知道發生了什麼。這段代碼是一個很好的例子,爲什麼你不應該使用一個標誌來控制BlockingQueue。

更新2:

無賴,@JohnVint讓貓出來的袋。是的,毒丸是解決這種競爭條件的方法。

+0

好的建議,我加了,但問題是一樣的。 – JackWM 2012-08-14 19:08:08

+0

具體是什麼問題? – jtahlborn 2012-08-14 19:09:34

+0

它被卡住而沒有退出。 – JackWM 2012-08-14 19:10:14

0

你的程序將被卡在以下情形:

生產者只是消費者檢查existFlag是否真正後設置exitFlag(沒有把新的元素)。如果隊列中沒有更多元素(消費者設法處理之前的所有元素),則消費者將被阻止在queue.take()上。

您可以使用queue.poll(),這是沒有阻塞方法。這需要稍微改變你的程序。