2013-04-20 190 views
1

我有一個標準的生產者消費者問題。生產者將數據放入消費者使用的堆棧(緩衝區)中。 問題是,當生產者結束生產數據時,消費者不會死亡(並非總是)。消費者生產者多線程消費者不會消逝

我想使消費者模具生產結束了他for

for(int i = 0; i < 10; i++){ 
     try{ 
    //  sleep((int)(Math.random() * 1));     
     }catch(Exception e){e.printStackTrace();} 
     b.put((int) (Math.random()* 10)); 
     System.out.println("i = " + i); 
    } 
    b.stop(); 

後,所以後來我打電話b.stop()從而改變running領域Buffer爲false,notifiesAll()

末然後我得到:

i = 9 // number of iteration this is 10th iteration 
Consumer 2.: no data to take. I wait. Memory: 0 
Consumer 1.: no data to take. I wait. Memory: 0 
Consumer 3.: no data to take. I wait. Memory: 0 

他們應該死的話,所以我做了方法stop()但它d編號不起作用。

代碼運行,請檢查

import java.util.Stack; 


public class Buffer { 
private static int SIZE = 4; 
private int i;//number of elements in buffer 
public Stack<Integer> stack; 
private volatile boolean running; 
    public Buffer() { 
     stack = new Stack<>(); 
     running = true; 
     i = 0; 
    } 
    synchronized public void put(int val){ 
     while (i >= SIZE) { 
      try { 
       System.out.println("Buffer full, producer waits"); 
       wait(); 
      } catch (InterruptedException exc) { 
       exc.printStackTrace(); 
      } 
     } 
     stack.push(val);//txt = s; 
     i++; 
     System.out.println("Producer inserted " + val + " memory: " + i); 
     if(i - 1 == 0) 
      notifyAll(); 
     System.out.println(stack); 
    } 

    public synchronized Integer get(Consumer c) { 
     while (i == 0) { 
      try { 
       System.out.println(c + ": no data to take. I wait. Memory: " + i); 
       wait(); 
      } catch (InterruptedException exc) { 
       exc.printStackTrace(); 
      } 
     } 
     if(running){ 
      int data = stack.pop(); 
      i--;  
      System.out.println(c+ ": I took: " + data +" memory: " + i); 
      System.out.println(stack); 
      if(i + 1 == SIZE){//if the buffer was full so the producer is waiting 
       notifyAll(); 
       System.out.println(c + "I notified producer about it"); 
     } 
     return data;} 
     else 
      return null; 
    } 

    public boolean isEmpty(){ 
     return i == 0; 
    } 
    public synchronized void stop(){//I THOUGH THIS WOULD FIX IT~!!!!!!!!!!!!!! 
     running = false; 
     notifyAll(); 
    } 
    public boolean isRunning(){ 
     return running; 
    } 

} 

public class Producer extends Thread { 
private Buffer b; 
    public Producer(Buffer b) { 
     this.b = b; 
    } 

    public void run(){ 
     for(int i = 0; i < 10; i++){ 
      try{ 
     //  sleep((int)(Math.random() * 1));     
      }catch(Exception e){e.printStackTrace();} 
      b.put((int) (Math.random()* 10)); 
      System.out.println("i = " + i); 
     } 
     b.stop(); 
    } 

} 

public class Consumer extends Thread { 
    Buffer b; 
    int nr; 
    static int NR = 0; 

    public Consumer(Buffer b) { 
     this.b = b; 
     nr = ++NR; 
    } 

    public void run() { 
     Integer i = b.get(this); 
     while (i != null) { 
      System.out.println(nr + " I received : " + i); 
      i = b.get(this); 
     } 
     System.out.println("Consumer " + nr + " is dead"); 
    } 

    public String toString() { 
     return "Consumer " + nr + "."; 
} 

} 

public class Main { 

    public static void main(String[] args) { 

     Buffer b = new Buffer(); 
     Producer p = new Producer(b); 
     Consumer c1 = new Consumer(b); 
     Consumer c2 = new Consumer(b); 
     Consumer c3 = new Consumer(b); 
     p.start(); 
     c1.start();c2.start();c3.start(); 

    } 

} 
+0

請發佈main()方法以及 – NINCOMPOOP 2013-04-20 12:58:05

+0

完成....................... – Yoda 2013-04-20 12:58:49

回答

2

當緩衝區是空的,消費者開始等待。當它被通知時,它檢查緩衝區是否爲空,並且如果它仍然是空的,則開始等待。如果running標誌已被設置爲false,你不應該開始等待再次:

while (i == 0 && running) { 
    ... 
+0

很好。 stop()應該看起來像現在一樣嗎? \t'public synchronized void stop(){ \t \t running = false; \t \t notifyAll(); \t}'我認爲notifyAll不需要 – Yoda 2013-04-20 13:04:06

+0

是的。我會使用Thread.interrupt()並檢查中斷標誌,而不是使用運行標誌,但你看起來很好。 – 2013-04-20 13:05:15

0

了小修改,你run()Consumer

public void run() { 
    Integer i = b.get(this); 
    while (i != null && i!=10) { 
     System.out.println(nr + " I received : " + i); 
     i = b.get(this); 
    } 
    System.out.println("Consumer " + nr + " is dead"); 
} 
0

你實現了一個重大缺陷。變量i的修改不是線程安全的,這意味着你可以得到無法解釋的結果。它可能導致競爭狀態。

0
using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace ThreadsConsolApp 
{ 

     public sealed class ProducerConsumer 
     { 
      const int MagicNumber = 30;         // Indicates how many times to bounce between ping and pong threads 
      private Object m_lock = new Object();      // Lock to protect counter increment 
      private Queue<int> m_queue = new Queue<int>(); 

      // Ctor 
      public ProducerConsumer() 
      { 
      } 
      public void admin() 
      { 
       int i = 0; 
       lock (m_lock) 
       { 
        while (i <= MagicNumber) 
        { 
         Monitor.Wait(m_lock); //wait 
         Console.WriteLine("Controller = Produced " + i + " , Consumeed " + i); 
         Monitor.Pulse(m_lock); //release 
         i++; 
        } 
       } 
      } 
      // Ping 
      public void Producer() 
      { 
       int counter = 0; 

       lock (m_lock)            // Allows only one thread at a time inside m_lock 
       { 
        while (counter <= MagicNumber) 
        { 
         Thread.Sleep(500);        // Get data chunks from some source 
         Monitor.Wait(m_lock);       // Wait if the thread is busy. 'wait' will hold 
                     //this loop until something else pulses it to release the wait. 
         Console.WriteLine("producer {0}", counter); 
         m_queue.Enqueue(counter);//entring in queue 
         Monitor.Pulse(m_lock);       // Releases consumer thread 
         counter++; 
        } 
       } 
      } 

      public void Consumer() 
      { 
       lock (m_lock)           // Allows only one thread at a time inside m_lock 
       { 
        Monitor.Pulse(m_lock); 

        while (Monitor.Wait(m_lock,1000))     // Wait in the loop while producer is busy. Exit when producer times-out. 1000 = 1 second; ... 
                     //app will hang without this time-out value 
        { 
         int data = m_queue.Dequeue();//dispatch from queue 
         Console.WriteLine("consumer {0}", data); 
         Monitor.Pulse(m_lock);       // Release consumer 
         Console.WriteLine("====================="); 
        } 
       } 
      } 
     } 

     class Program 
     { 
      static void Main(string[] args) 
      { 
       ProducerConsumer app = new ProducerConsumer(); 

       // Create 2 threads 
       Thread t_producer = new Thread(new ThreadStart(app.Producer)); 
       Thread t_consumer = new Thread(new ThreadStart(app.Consumer)); 
       Thread t_admin = new Thread(new ThreadStart(app.admin)); 

       // Start threads 
       t_admin.Start(); 
       t_producer.Start(); 
       t_consumer.Start(); 

       // Waith for the threads to complete 
       t_admin.Join(); 
       t_producer.Join(); 
       t_consumer.Join(); 

       Console.WriteLine("\nPress any key to complete the program.\n"); 
       Console.ReadKey(false); 
      } 
     } 
    }