0

我有關於一個發佈服務器 - 多訂閱服務器模式的實現的問題。發佈者使用固定大小的緩衝區並對消息進行排隊。這些消息被髮送給所有用戶。訂閱者獲得的消息的排序必須與發佈消息的排序相同。BlockingQueue.size()在發佈服務器訂閱服務器中返回錯誤的大小

我使用的BlockingQueue持有發行消息(publisherQueue)並將它們傳遞到每個訂戶的BlockingQueue(subscriberQueue)。

問題是緩衝和用戶工作正常,但緩衝區大小(publisherQueue.size())始終返回1.

System.out.println("Actual number of messages in buffer: " + publisherQueue.size()); 

這裏是我的全碼:

PublisherSubscriberService.java

package program; 

import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 


public class PublisherSubscriberService { 
    private int buffer; 
    private int subscribersNumber; 
    static Set<subscriber> subscribers = new HashSet<subscriber>(); 

    public PublisherSubscriberService(int buffer, int subscribersNumber) { 
     this.buffer = buffer; 
     this.subscribersNumber = subscribersNumber; 
    } 

    public void addsubscriber(subscriber subscriber) { 
     subscribers.add(subscriber); 
    } 

    public void start() { 
     publisher publisher = new publisher(buffer); 
     System.out.println("publisher started the job"); 

     for (int i = 0; i < subscribersNumber; i++) { 
      subscriber subscriber = new subscriber(buffer); 
      subscriber.setName(Integer.toString(i + 1)); 
      subscribers.add(subscriber); 
      new Thread(subscriber).start(); 
      System.out.println("Subscriber " + subscriber.getName() + " started the job"); 
     } 
     new Thread(publisher).start(); 
    } 

    public class Publisher implements Runnable { 
     private int buffer; 
     final BlockingQueue<Message> publisherQueue; 

     public Publisher(int buffer) { 
      this.buffer = buffer; 
      publisherQueue = new LinkedBlockingQueue<>(buffer); 
     } 

     @Override 
     public void run() { 
      for (int i = 1; i < 100; i++) { 
       Message messageObject = new Message("" + i); 
       try { 
        Thread.sleep(50); 
        publisherQueue.put(messageObject); 
        System.out.println("Queued message no " +   messageObject.getMessage()); 
        System.out.println("Actual number of messages in buffer:  " + publisherQueue.size()); 
        for (subscriber subscriber : subscribers) { 
         subscriber.subscriberQueue.put(messageObject); 
        } 
        publisherQueue.take(); 
       } catch (InterruptedException e) { 
        System.out.println("Some error"); 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

    class Subscriber implements Runnable { 
     private String name; 
     private int buffer; 
     final BlockingQueue<Message> subscriberQueue; 

     public Subscriber(int buffer) { 
      this.buffer = buffer; 
      subscriberQueue = new LinkedBlockingQueue<>(buffer); 
     } 

     public void setName(String name) { 
      this.name = name; 
     } 

     public String getName() { 
      return name; 
     } 

     @Override 
     public void run() { 
      try { 
       Message messageObject; 
       while (true) { 
        Thread.sleep(100); 
        messageObject = subscriberQueue.take(); 
        System.out.println(this.getName() + " got message: " + messageObject.getMessage()); 
       } 
      } catch (InterruptedException e) { 
       System.out.println("Some error"); 
       e.printStackTrace(); 
      } 
     } 
    } 
class Message { 
    private String message; 

    public Message(String str) { 
     this.message = str; 
    } 

    public String getMessage() { 
     return message; 
    } 

} 
} 

PublisherSubscriberProgram.java

package program; 

public class ProducerConsumerProgram { 

    public static void main(String[] args) { 
     ProducerConsumerService service = new ProducerConsumerService(10, 3); 
     service.start(); 
    } 
} 

回答

1

您的出版商從來沒有在排隊超過1個項目。每次通過你的循環,你把和單個項目:

   **publisherQueue.put(messageObject);** 
       System.out.println("Queued message no " +   messageObject.getMessage()); 
       System.out.println("Actual number of messages in buffer:  " + publisherQueue.size()); 
       for (subscriber subscriber : subscribers) { 
        subscriber.subscriberQueue.put(messageObject); 
       } 
       **publisherQueue.take();** 

隨着您提供的代碼,甚至有發佈者隊列的點。

相關問題