2016-01-22 313 views
1

當生產者產生一條消息時,它必須被所有的消費者消費,然後只有生產者可以產生第二條消息,並且所有的消費者再次消費它,等等。 我試過編寫這段代碼,但沒有按照需求工作。任何人都可以幫忙嗎?製片人 - 消費者Scenerio。我有一個生產者和多個消費者

package Demo3; 

    import java.util.concurrent.Semaphore; 

    public class ConsumerProducerMonitor { 

     // produces items 
     public synchronized void put(String item,int itemNo,String threadName) { 
      if (isProduced) { 
       return; 

      } 

      this.itemNo = itemNo; 
      this.item=item; 
      System.out.println(isProduced+"hujj"); 
      System.out.println("Producer " + threadName + " put Item: " + this.item); 

      if (this.itemNo == 0) { 
       isProduced = true; 
       System.out.println(isProduced); 
       this.notifyAll(); 
      } 

     } 
     int flag=10; 
     private void consumeItems(String threadName) { 
     System.out.println("hre i m"); 
      // for (int i = 0; i < 2; ++i) { 
       if (itemNo < 0) 
        return; 

       flag--; 
      // } 
      System.out.println("Consumer " + threadName + " consumed Items from " + this.item); 
      if (!sem.tryAcquire()) { 
       System.out.println("Failed to aquire semaphore for consumer: " + threadName); 
      } 
     } 

     // consumes item 
     public synchronized int get(String threadName) { 
      if (!isProduced) { 
       try { 
        this.wait(); 
       } catch (InterruptedException e) { 
        System.out.println("Caught Interrupted Exceptino while waiting to consume item: " + e.getMessage()); 
       } 
      } 

      if (flag == 0) { 
       // sem.release(NUM_SEMAPHORES); 
       return this.itemNo; 
      } 

      if (isConsuming) { 
       try { 
        this.wait(); 
        isConsuming = true; 
       } catch (InterruptedException e) { 
        System.out.println("Caught Interrupted Exceptino while waiting to consume item: " + e.getMessage()); 
       } 
      } 

      switch (sem.availablePermits()) { 
       case 1: 
        if (threadName.equals("C10")) { 
         System.out.println("reaching"); 
         consumeItems(threadName); 
         if (threadName.equals("C10")) { 
          sem.release(NUM_SEMAPHORES); 
         } 
        } 
        break; 
       case 2: 
        if (threadName.equals("C9")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 3: 
        if (threadName.equals("C8")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 4: 
        if (threadName.equals("C7")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 5: 
        if (threadName.equals("C6")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 6: 
        if (threadName.equals("C5")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 7: 
        if (threadName.equals("C4")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 8: 
        if (threadName.equals("C3")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 9: 
        if (threadName.equals("C2")) { 
         consumeItems(threadName); 
        } 
        break; 
       case 10: 
        if (threadName.equals("C1")) { 
         consumeItems(threadName); 
        } 
        break; 
       default: 
        break; 
      } 
      // isConsuming = false; 
      this.notifyAll(); 
      return flag; 
     } 

     private static int NUM_SEMAPHORES =10; 
     private final Semaphore sem = new Semaphore(NUM_SEMAPHORES); 
     private boolean isProduced = false; 
     private boolean isConsuming = false; 
     String item; 
     int itemNo; 
    } 





    package Demo3; 

    import java.util.ArrayList; 
    import java.util.List; 
    import java.util.Vector; 

    public class ConsumerProducer5 { 
     public static void main(String[] args) { 
      ConsumerProducer5 cp = new ConsumerProducer5(); 
      cp.StartconsumerProducer(); 
     } 


     public void StartconsumerProducer() { 
      ConsumerProducerMonitor mon = new ConsumerProducerMonitor(); 
      List threads = new ArrayList(); 
      Vector sharedQueue = new Vector(); 
      // Create a producer 
      Thread p1 = new Thread(new Producer5(sharedQueue,mon,20), "P1"); 
      p1.start(); 
      // Create consumer 1 
      Thread c10 = new Thread(new Consumer5(mon,sharedQueue), "C10"); 
      c10.start(); 
      System.out.println("working"); 
      // Create consumer 2 
      Thread c2 = new Thread(new Consumer5(mon,sharedQueue), "C2"); 
      c2.start(); 
      System.out.println("working321"); 
      // Create consumer 3 
      Thread c3 = new Thread(new Consumer5(mon,sharedQueue), "C3"); 
      c3.start(); 
      Thread c4 = new Thread(new Consumer5(mon,sharedQueue), "C4"); 
      c4.start(); 
      Thread c5 = new Thread(new Consumer5(mon,sharedQueue), "C5"); 
      c5.start(); 
      Thread c6 = new Thread(new Consumer5(mon,sharedQueue), "C6"); 
      c6.start(); 
      Thread c7 = new Thread(new Consumer5(mon,sharedQueue), "C7"); 
      c7.start(); 
      Thread c8 = new Thread(new Consumer5(mon,sharedQueue), "C8"); 
      c8.start(); 
      Thread c9 = new Thread(new Consumer5(mon,sharedQueue), "C9"); 
      c9.start(); 
      Thread c1 = new Thread(new Consumer5(mon,sharedQueue), "C1"); 
      c1.start(); 
      threads.add(p1); 
      threads.add(c1); 
      threads.add(c2); 
      threads.add(c3); 
      threads.add(c4); 
      threads.add(c5); 
      threads.add(c6); 
      threads.add(c7); 
      threads.add(c8); 
      threads.add(c9); 
      threads.add(c10); 


      for (int i = 0; i < threads.size(); ++i) { 
       try { 
        ((Thread)threads.get(i)).join(20000); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 


package Demo3; 

import java.util.Vector; 

public class Consumer5 implements Runnable { 
    private final Vector sharedQueue; 
    Consumer5(ConsumerProducerMonitor mon,Vector sharedQueue) { 
     this.mon = mon; 
     this.sharedQueue=sharedQueue; 
    } 

    @Override 
    public void run() { 
     System.out.println("coming hre"); 
     int ret=1; 
     while (ret >= 1) { 
      ret = mon.get(Thread.currentThread().getName()); 
      while (ret == 1) { 
      synchronized (sharedQueue) { 

        try { 
         sharedQueue.wait(); 
        } catch (InterruptedException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 

        sharedQueue.notifyAll(); 
       } 

      } 
     } 
    } 

    private final ConsumerProducerMonitor mon; 
} 

package Demo3; 

import java.util.Vector; 
import java.util.logging.Level; 
import java.util.logging.Logger; 



public class Producer5 implements Runnable { 
    ConsumerProducerMonitor mon; 
    private final Vector sharedQueue; 
    private final int SIZE; 

    public Producer5(Vector sharedQueue,ConsumerProducerMonitor mon, int size) { 
     this.sharedQueue = sharedQueue; 
     this.SIZE = size; 
     this.mon=mon; 
    } 

    @Override 
    public void run() { 
     for (int i = 0; i < 20; i++) { 
      System.out.println("Produced: " + i); 
     // mon=new ConsumerProducerMonitor(); 


      try { 
       produce(i); 
      } catch (InterruptedException ex) { 
       Logger.getLogger(Producer5.class.getName()).log(Level.SEVERE, null, ex); 
      } 

     } 
    } 

    private void produce(int i) throws InterruptedException { 

     //wait if queue is full 
     while (sharedQueue.size() == 1) { 
      synchronized (sharedQueue) { 
        System.out.println("Queue is full " + Thread.currentThread().getName() 
            + " is waiting , size: " + sharedQueue.size()); 

       sharedQueue.wait(); 
      } 
     } 

     //producing element and notify consumers 
     synchronized (sharedQueue) { 
      sharedQueue.removeAllElements(); 
      sharedQueue.add("Message No."+i); 
      mon.put(sharedQueue.get(i).toString(),i, Thread.currentThread().getName()); 
      System.out.println(sharedQueue); 
      sharedQueue.notifyAll(); 
     } 

    } 

} 
+0

Oh Knuth,剛剛開始的一個新學期,不是嗎? – Voo

回答

-1

代碼中有很多東西是不需要的。我們可以用更少的手段完成任務。

基本上這裏的概念是生產者生產產品,通知所有的消費者,然後等待自己。

另一方面,消費者每個消費產品並等到最後一個消費者消費該產品,認識到它是最後一個消費者並通知生產者開始生產。

下面是處理您的要求的代碼。

import java.util.LinkedList; 
import java.util.Queue; 
import java.util.concurrent.atomic.AtomicInteger; 

public class ProducerConsumerMultiple { 

static Queue<String> sharedQueue = new LinkedList<>(); 

public static void main(String args[]) { 

    Producer producer = new Producer("Producer1"); 
    Consumer consumer1 = new Consumer("Consumer1"); 
    Consumer consumer2 = new Consumer("Consumer2"); 
    Consumer consumer3 = new Consumer("Consumer3"); 
    Consumer consumer4 = new Consumer("Consumer4"); 
    Consumer consumer5 = new Consumer("Consumer5"); 
    new Thread(producer).start(); 
    new Thread(consumer1).start(); 
    new Thread(consumer2).start(); 
    new Thread(consumer3).start(); 
    new Thread(consumer4).start(); 
    new Thread(consumer5).start(); 
} 

public static class Producer implements Runnable { 
    String producerName; 
    AtomicInteger itemNameCounter = new AtomicInteger(0); 

    public Producer(String producerName) { 
     this.producerName = producerName; 
    } 

    public void produce() throws InterruptedException { 
     synchronized (sharedQueue) { 
      System.out.println(producerName + " has produced item" + itemNameCounter.incrementAndGet()); 
      sharedQueue.add("item" + itemNameCounter.get());//adding the product to the queue 
      sharedQueue.notifyAll(); 
      sharedQueue.wait(); 
     } 
    } 

    @Override 
    public void run() { 
     while (true) { 
      try { 
       produce(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public static class Consumer implements Runnable { 
    String consumerName; 
    static AtomicInteger totalConsumersCount = new AtomicInteger(0); 
    static Integer consumptionCountForItem = 0; 

    public Consumer(String consumerName) { 
     this.consumerName = consumerName; 
    } 

    public void consume() throws InterruptedException { 

     synchronized (sharedQueue) { 
      if (sharedQueue.size() <= 0) { 
       sharedQueue.wait(); 
      } 

      System.out.println(consumerName + " has " + "consumed " + sharedQueue.peek());// Consumer is consuming the item. 

      if (++consumptionCountForItem == totalConsumersCount.get()) { 
       sharedQueue.remove(); 
       sharedQueue.notifyAll(); 
       consumptionCountForItem = 0; 
      } 
      sharedQueue.wait(); 
     } 
    } 

    @Override 
    public void run() { 
     totalConsumersCount.incrementAndGet(); 
     while (true) { 
      try { 
       consume(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
       totalConsumersCount.decrementAndGet(); 
      } 
     } 
    } 
} 
}