2010-02-25 259 views
46

我想創建某種Producer/Consumer線程應用程序。但我不確定在兩者之間實現隊列的最佳方式。使用隊列的生產者/消費者線程

所以我有兩個想法(這可能是完全錯誤的)。我想知道哪個更好,如果他們都吸了,那麼實施隊列的最好方法是什麼。這主要是我在這些例子中執行的隊列,我很關心。我正在擴展一個Queue類,它是一個內部類,並且是線程安全的。下面是兩個例子,每個例子有4個類。

主要類 -

public class SomeApp 
{ 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     consumer = new Consumer(); 
     producer = new Producer(); 
    } 
} 

消費者接收機類

public class Consumer implements Runnable 
{ 
    public Consumer() 
    { 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = QueueHandler.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

生產者接收機類

public class Producer implements Runnable 
{ 
    public Producer() 
    { 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //add to the queue some sort of unique object 
      QueueHandler.enqueue(new Object()); 
     } 
    } 
} 

隊列接收機類

public class QueueHandler 
{ 
    //This Queue class is a thread safe (written in house) class 
    public static Queue<Object> readQ = new Queue<Object>(100); 

    public static void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(object); 
    } 

    public static Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

OR

主要類 -

public class SomeApp 
{ 
    Queue<Object> readQ; 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     readQ = new Queue<Object>(100); 
     consumer = new Consumer(readQ); 
     producer = new Producer(readQ); 
    } 
} 

消費者接收機類

public class Consumer implements Runnable 
{ 
    Queue<Object> queue; 

    public Consumer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = queue.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

生產者接收機類

public class Producer implements Runnable 
{ 
    Queue<Object> queue; 

    public Producer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 

     while(true) 
     { 
      //add to the queue some sort of unique object 
      queue.enqueue(new Object()); 
     } 
    } 
} 

隊列接收機類

//the extended Queue class is a thread safe (written in house) class 
public class QueueHandler extends Queue<Object> 
{  
    public QueueHandler(int size) 
    { 
     super(size); //All I'm thinking about now is McDonalds. 
    } 

    public void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(); 
    } 

    public Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

去吧!

+0

生產者enqueue和消費者出隊,順便說一句。反之亦然.. – 2010-02-25 08:22:25

+1

哦,不要從構造函數中啓動線程!該線程可以觀察處於不一致狀態的對象。有關詳細信息,請參閱「Java併發實踐」。 – 2010-02-25 08:24:12

+0

謝謝Zwei,入隊的事情是我沒有專注。從構造函數的東西開始線程我應該運行一個intiliazation方法,並在那裏啓動它,或者它應該從主方法類啓動它? – Gareth 2010-02-25 09:05:29

回答

69

Java 5+擁有您需要的所有工具。你會想要:

  1. 把你所有的生產者在一個ExecutorService;
  2. 把所有的消費者放在另一個ExecutorService;
  3. 如有必要,使用BlockingQueue在兩者之間進行通信。

我說「如果有必要」(3),因爲從我的經驗來看,這是一個不必要的步驟。你所做的就是向消費者執行者服務提交新任務。所以:

final ExecutorService producers = Executors.newFixedThreadPool(100); 
final ExecutorService consumers = Executors.newFixedThreadPool(100); 
while (/* has more work */) { 
    producers.submit(...); 
} 
producers.shutdown(); 
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
consumers.shutdown(); 
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 

所以producers直接提交consumers

+2

Cletus對錢的更多信息,以幫助澄清「從哪裏開始」 http://java.sun.com/docs/books/tutorial/essential/concurrency/ – edwardsmatt 2010-02-25 09:58:25

+0

「所以生產者直接提交給消費者」 - 是否可以平行調用consumers.submit(...)是否安全,還是應該同步呢? – Gevorg 2013-09-23 21:27:21

+0

如果您共享BlockingQueue,您是否可以爲生產者和消費者使用1個執行程序? – devo 2013-10-31 20:33:09

9

您正在重新發明輪子。

如果您需要持久性和其他企業功能使用JMS(我建議ActiveMq)。

如果您需要快速內存隊列,請使用java的Queue的其中一個阻礙。

如果您需要支持java 1.4或更早版本,請使用Doug Lea的優秀concurrent包。

+4

你仍然可以被要求在求職面試時實施生產者消費者:) – 2012-11-06 16:43:21

+0

我的確發現了java.util.concurrent中的實用程序有用,但我覺得很難稱它爲「優秀」,但它仍然迫使我通過兩個參數只是爲了指定超時。它是否會殺死Doug製作一個名爲Duration的課程? – Trejkaz 2014-03-03 22:24:03

17

OK,正如其他人注意,做的最好的事情就是用java.util.concurrent包。我強烈推薦「實踐中的Java併發」。這是一本很好的書,涵蓋了你需要知道的幾乎所有內容。

至於你的具體實現,正如我在評論中指出,不要從構造器線程 - 它可以是不安全的。

留下之外,第二實施似乎更好。你不想把隊列放在靜態字段中。你可能只是失去靈活性而已。

如果你想與自己的實現先走(用於學習目的,我猜?),至少提供一個start()方法。你應該構造對象(你可以實例化Thread對象),然後調用start()來啓動線程。

編輯:ExecutorService有自己的隊列,這可能會造成混淆。這裏的東西,讓你開始。

public class Main { 
    public static void main(String[] args) { 
     //The numbers are just silly tune parameters. Refer to the API. 
     //The important thing is, we are passing a bounded queue. 
     ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)); 

     //No need to bound the queue for this executor. 
     //Use utility method instead of the complicated Constructor. 
     ExecutorService producer = Executors.newSingleThreadExecutor(); 

     Runnable produce = new Produce(consumer); 
     producer.submit(produce); 
    } 
} 

class Produce implements Runnable { 
    private final ExecutorService consumer; 

    public Produce(ExecutorService consumer) { 
     this.consumer = consumer; 
    } 

    @Override 
    public void run() { 
     Pancake cake = Pan.cook(); 
     Runnable consume = new Consume(cake); 
     consumer.submit(consume); 
    } 
} 

class Consume implements Runnable { 
    private final Pancake cake; 

    public Consume(Pancake cake){ 
     this.cake = cake; 
    } 

    @Override 
    public void run() { 
     cake.eat(); 
    } 
} 

進一步編輯: 對於生產者,而不是while(true),你可以這樣做:

@Override 
public void run(){ 
    while(!Thread.currentThread().isInterrupted()){ 
     //do stuff 
    } 
} 

這種方式可以通過調用.shutdownNow()關閉執行人。如果你使用while(true),它不會關閉。

還要注意的是Producer仍易受RuntimeExceptions(即一個RuntimeException將暫停處理)

+0

所以我應該給消費者和生產者添加一個start()方法?你是說我應該在我的主要方法中加入類似的東西嗎? consumer = new Consumer(); consumer.start(readQ); 還是這個? consumer = new Comsumer(readQ); consumer.start(); – Gareth 2010-02-25 09:35:21

+1

你通常會做新的Comsumer(readQ); consumer.start();.在你的情況下,最好聲明隊列私有final,如果你這樣做,你需要在構造函數中設置隊列。如果這是生產代碼,我強烈建議您使用cletus的答案。如果您絕對需要使用您的內部隊列,那麼您應該使用ExecutorService executor = Executors.newSingleThreadExecutor()而不是原始線程。除此之外,這將保護您免受RuntimeException停止系統。 – 2010-02-25 18:41:24

+0

謝謝。很有幫助。我已經和BlockingQueue一樣,像在內部隊列中建議的cletus一樣。仍然試圖讓我的頭在ExecutorService類中,但是當我這樣做時,我一定會使用它。謝謝你的幫助。 – Gareth 2010-02-26 08:32:23

1
  1. 已同步put和get方法的Java代碼「的BlockingQueue」。
  2. Java代碼「Producer」,生成器線程來生成數據。
  3. Java代碼「消費者」,消費者線程消費生成的數據。
  4. Java代碼「ProducerConsumer_Main」,主要功能是啓動生產者和消費者線程。

BlockingQueue.java

public class BlockingQueue 
{ 
    int item; 
    boolean available = false; 

    public synchronized void put(int value) 
    { 
     while (available == true) 
     { 
      try 
      { 
       wait(); 
      } catch (InterruptedException e) { 
      } 
     } 

     item = value; 
     available = true; 
     notifyAll(); 
    } 

    public synchronized int get() 
    { 
     while(available == false) 
     { 
      try 
      { 
       wait(); 
      } 
      catch(InterruptedException e){ 
      } 
     } 

     available = false; 
     notifyAll(); 
     return item; 
    } 
} 

Consumer.java

package com.sukanya.producer_Consumer; 

public class Consumer extends Thread 
{ 
    blockingQueue queue; 
    private int number; 
    Consumer(BlockingQueue queue,int number) 
    { 
     this.queue = queue; 
     this.number = number; 
    } 

    public void run() 
    { 
     int value = 0; 

     for (int i = 0; i < 10; i++) 
     { 
      value = queue.get(); 
      System.out.println("Consumer #" + this.number+ " got: " + value); 
     } 
    } 
} 

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer; 

public class ProducerConsumer_Main 
{ 
    public static void main(String args[]) 
    { 
     BlockingQueue queue = new BlockingQueue(); 
     Producer producer1 = new Producer(queue,1); 
     Consumer consumer1 = new Consumer(queue,1); 
     producer1.start(); 
     consumer1.start(); 
    } 
} 
+3

沒有解釋的代碼轉儲很少有幫助。 – Chris 2014-10-15 17:49:40

6

我有擴展Cletus提出了對工作代碼示例的回答。

  1. 其中一個ExecutorService(pes)接受Producer任務。
  2. 一個ExecutorService(ces)接受Consumer任務。
  3. 均爲ProducerConsumerBlockingQueue
  4. 多個Producer任務會生成不同的數字。
  5. 任何的Consumer任務可以消耗由Producer

代碼生成的數字:

import java.util.concurrent.*; 

public class ProducerConsumerWithES { 
    public static void main(String args[]){ 
     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>(); 

     ExecutorService pes = Executors.newFixedThreadPool(2); 
     ExecutorService ces = Executors.newFixedThreadPool(2); 

     pes.submit(new Producer(sharedQueue,1)); 
     pes.submit(new Producer(sharedQueue,2)); 
     ces.submit(new Consumer(sharedQueue,1)); 
     ces.submit(new Consumer(sharedQueue,2)); 
     // shutdown should happen somewhere along with awaitTermination 
     /* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */ 
     pes.shutdown(); 
     ces.shutdown(); 
    } 
} 
class Producer implements Runnable { 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.threadNo = threadNo; 
     this.sharedQueue = sharedQueue; 
    } 
    @Override 
    public void run() { 
     for(int i=1; i<= 5; i++){ 
      try { 
       int number = i+(10*threadNo); 
       System.out.println("Produced:" + number + ":by thread:"+ threadNo); 
       sharedQueue.put(number); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

class Consumer implements Runnable{ 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.sharedQueue = sharedQueue; 
     this.threadNo = threadNo; 
    } 
    @Override 
    public void run() { 
     while(true){ 
      try { 
       int num = sharedQueue.take(); 
       System.out.println("Consumed: "+ num + ":by thread:"+threadNo); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

輸出:

Produced:11:by thread:1 
Produced:21:by thread:2 
Produced:22:by thread:2 
Consumed: 11:by thread:1 
Produced:12:by thread:1 
Consumed: 22:by thread:1 
Consumed: 21:by thread:2 
Produced:23:by thread:2 
Consumed: 12:by thread:1 
Produced:13:by thread:1 
Consumed: 23:by thread:2 
Produced:24:by thread:2 
Consumed: 13:by thread:1 
Produced:14:by thread:1 
Consumed: 24:by thread:2 
Produced:25:by thread:2 
Consumed: 14:by thread:1 
Produced:15:by thread:1 
Consumed: 25:by thread:2 
Consumed: 15:by thread:1 

注。如果您不需要多個生產者和消費者,請保留單個生產者和消費者。我已經添加了多個生產者和消費者,以在多個生產者和消費者中展示BlockingQueue的功能。

+0

當多個生產者和消費者在場時,這並不關心競爭狀況。每個看到容量爲0並嘗試添加。使用單生產者和單消費者無需在BlockingQueue上同步,如果它不止一個,仍需要同步。 – Cleonjoys 2017-09-16 17:30:41

+0

你可以做一件事,註釋掉消費者,然後爲BlockingQueue設置固定尺寸,你會看到你自己。我用新的LinkedBlockingQueue (2)嘗試了您的代碼; 然後被輸出爲如下: 生產:11:通過螺紋:1 生產:21:通過螺紋:2 生產:22:通過螺紋:2 生產:12:通過螺紋:1 如何更當Queue的設置容量爲2時插入值 – Cleonjoys 2017-09-17 18:01:28

+0

這就是BlockingQueue的性質。除非有容量可用,否則將被阻止。我正在使用無界阻塞隊列,以上情況不會出現。即使它是由於BlockingQueue有界而產生的,它也是Java實現它的方式。請查看https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#put-E-。我的帖子中的代碼段沒有任何問題。 – 2017-09-17 18:19:48

1

這是一個非常簡單的代碼。

import java.util.*; 

// @author : rootTraveller, June 2017 

class ProducerConsumer { 
    public static void main(String[] args) throws Exception { 
     Queue<Integer> queue = new LinkedList<>(); 
     Integer buffer = new Integer(10); //Important buffer or queue size, change as per need. 

     Producer producerThread = new Producer(queue, buffer, "PRODUCER"); 
     Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER"); 

     producerThread.start(); 
     consumerThread.start(); 
    } 
} 

class Producer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize ; 

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super(ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.size() == queueSize){ 
        System.out.println(Thread.currentThread().getName() + " FULL   : waiting...\n"); 
        try{ 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then produce one, add and notify 
       int randomInt = new Random().nextInt(); 
       System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
       queue.add(randomInt); 
       queue.notifyAll(); //Important 
      } //synchronized ends here : NOTE 
     } 
    } 
} 

class Consumer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize; 

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super (ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.isEmpty()){ 
        System.out.println(Thread.currentThread().getName() + " Empty  : waiting...\n"); 
        try { 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then consume one and notify 
       System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove()); 
       queue.notifyAll(); 
      } //synchronized ends here : NOTE 
     } 
    } 
}