1
當生產者產生一條消息時,它必須被所有的消費者消費,然後只有生產者可以產生第二條消息,並且所有的消費者再次消費它,等等。 我試過編寫這段代碼,但沒有按照需求工作。任何人都可以幫忙嗎?製片人 - 消費者Scenerio。我有一個生產者和多個消費者
package Demo3;
import java.util.concurrent.Semaphore;
public class ConsumerProducerMonitor {
// produces items
public synchronized void put(String item,int itemNo,String threadName) {
if (isProduced) {
return;
}
this.itemNo = itemNo;
this.item=item;
System.out.println(isProduced+"hujj");
System.out.println("Producer " + threadName + " put Item: " + this.item);
if (this.itemNo == 0) {
isProduced = true;
System.out.println(isProduced);
this.notifyAll();
}
}
int flag=10;
private void consumeItems(String threadName) {
System.out.println("hre i m");
// for (int i = 0; i < 2; ++i) {
if (itemNo < 0)
return;
flag--;
// }
System.out.println("Consumer " + threadName + " consumed Items from " + this.item);
if (!sem.tryAcquire()) {
System.out.println("Failed to aquire semaphore for consumer: " + threadName);
}
}
// consumes item
public synchronized int get(String threadName) {
if (!isProduced) {
try {
this.wait();
} catch (InterruptedException e) {
System.out.println("Caught Interrupted Exceptino while waiting to consume item: " + e.getMessage());
}
}
if (flag == 0) {
// sem.release(NUM_SEMAPHORES);
return this.itemNo;
}
if (isConsuming) {
try {
this.wait();
isConsuming = true;
} catch (InterruptedException e) {
System.out.println("Caught Interrupted Exceptino while waiting to consume item: " + e.getMessage());
}
}
switch (sem.availablePermits()) {
case 1:
if (threadName.equals("C10")) {
System.out.println("reaching");
consumeItems(threadName);
if (threadName.equals("C10")) {
sem.release(NUM_SEMAPHORES);
}
}
break;
case 2:
if (threadName.equals("C9")) {
consumeItems(threadName);
}
break;
case 3:
if (threadName.equals("C8")) {
consumeItems(threadName);
}
break;
case 4:
if (threadName.equals("C7")) {
consumeItems(threadName);
}
break;
case 5:
if (threadName.equals("C6")) {
consumeItems(threadName);
}
break;
case 6:
if (threadName.equals("C5")) {
consumeItems(threadName);
}
break;
case 7:
if (threadName.equals("C4")) {
consumeItems(threadName);
}
break;
case 8:
if (threadName.equals("C3")) {
consumeItems(threadName);
}
break;
case 9:
if (threadName.equals("C2")) {
consumeItems(threadName);
}
break;
case 10:
if (threadName.equals("C1")) {
consumeItems(threadName);
}
break;
default:
break;
}
// isConsuming = false;
this.notifyAll();
return flag;
}
private static int NUM_SEMAPHORES =10;
private final Semaphore sem = new Semaphore(NUM_SEMAPHORES);
private boolean isProduced = false;
private boolean isConsuming = false;
String item;
int itemNo;
}
package Demo3;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
public class ConsumerProducer5 {
public static void main(String[] args) {
ConsumerProducer5 cp = new ConsumerProducer5();
cp.StartconsumerProducer();
}
public void StartconsumerProducer() {
ConsumerProducerMonitor mon = new ConsumerProducerMonitor();
List threads = new ArrayList();
Vector sharedQueue = new Vector();
// Create a producer
Thread p1 = new Thread(new Producer5(sharedQueue,mon,20), "P1");
p1.start();
// Create consumer 1
Thread c10 = new Thread(new Consumer5(mon,sharedQueue), "C10");
c10.start();
System.out.println("working");
// Create consumer 2
Thread c2 = new Thread(new Consumer5(mon,sharedQueue), "C2");
c2.start();
System.out.println("working321");
// Create consumer 3
Thread c3 = new Thread(new Consumer5(mon,sharedQueue), "C3");
c3.start();
Thread c4 = new Thread(new Consumer5(mon,sharedQueue), "C4");
c4.start();
Thread c5 = new Thread(new Consumer5(mon,sharedQueue), "C5");
c5.start();
Thread c6 = new Thread(new Consumer5(mon,sharedQueue), "C6");
c6.start();
Thread c7 = new Thread(new Consumer5(mon,sharedQueue), "C7");
c7.start();
Thread c8 = new Thread(new Consumer5(mon,sharedQueue), "C8");
c8.start();
Thread c9 = new Thread(new Consumer5(mon,sharedQueue), "C9");
c9.start();
Thread c1 = new Thread(new Consumer5(mon,sharedQueue), "C1");
c1.start();
threads.add(p1);
threads.add(c1);
threads.add(c2);
threads.add(c3);
threads.add(c4);
threads.add(c5);
threads.add(c6);
threads.add(c7);
threads.add(c8);
threads.add(c9);
threads.add(c10);
for (int i = 0; i < threads.size(); ++i) {
try {
((Thread)threads.get(i)).join(20000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
package Demo3;
import java.util.Vector;
public class Consumer5 implements Runnable {
private final Vector sharedQueue;
Consumer5(ConsumerProducerMonitor mon,Vector sharedQueue) {
this.mon = mon;
this.sharedQueue=sharedQueue;
}
@Override
public void run() {
System.out.println("coming hre");
int ret=1;
while (ret >= 1) {
ret = mon.get(Thread.currentThread().getName());
while (ret == 1) {
synchronized (sharedQueue) {
try {
sharedQueue.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sharedQueue.notifyAll();
}
}
}
}
private final ConsumerProducerMonitor mon;
}
package Demo3;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer5 implements Runnable {
ConsumerProducerMonitor mon;
private final Vector sharedQueue;
private final int SIZE;
public Producer5(Vector sharedQueue,ConsumerProducerMonitor mon, int size) {
this.sharedQueue = sharedQueue;
this.SIZE = size;
this.mon=mon;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println("Produced: " + i);
// mon=new ConsumerProducerMonitor();
try {
produce(i);
} catch (InterruptedException ex) {
Logger.getLogger(Producer5.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
private void produce(int i) throws InterruptedException {
//wait if queue is full
while (sharedQueue.size() == 1) {
synchronized (sharedQueue) {
System.out.println("Queue is full " + Thread.currentThread().getName()
+ " is waiting , size: " + sharedQueue.size());
sharedQueue.wait();
}
}
//producing element and notify consumers
synchronized (sharedQueue) {
sharedQueue.removeAllElements();
sharedQueue.add("Message No."+i);
mon.put(sharedQueue.get(i).toString(),i, Thread.currentThread().getName());
System.out.println(sharedQueue);
sharedQueue.notifyAll();
}
}
}
Oh Knuth,剛剛開始的一個新學期,不是嗎? – Voo