2017-05-23 25 views
0

背景: 我有一個標準的生產者消費者隊列,消費者緩慢,而生產者速度很快。期望是每當生產者完成所請求的消息時,它就認可該消息,並且生產者將假定與消息相關的任務完成。由於生產者速度很快,我不希望生產線等待,相反,只要消息被確認,就應該調用回調。由於JMS在這方面受到限制,並且我儘可能直接使用了像ActiveMQMessageProducer這樣的ActiveMQ類。ActiveMQ實現異步確認JAVA 8

問題: 消息正在自動確認,即使Consumer尚未啓動,註冊的異步回調也會被調用。 public void send(Destination destination, Message message, AsyncCallback onComplete)

生產者

public static boolean setup() {  
     Producer.connectionFactory = new 
     ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 
     // Create a Connection 
     Producer.connection = 
      (ActiveMQConnection)connectionFactory.createConnection(); 
     connection.setAlwaysSessionAsync(true); 
     connection.start();   
    } 

public Producer() { 
     session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
     destination = (ActiveMQDestination)session.createQueue("TEST.FOO"); 
     producer = (ActiveMQMessageProducer)session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     } 
... 

public void run() { 
     long id = messageID.getAndIncrement();   
     String text = "Hello world!" 
     Message message = session.createTextMessage(text); 
     producer.send(message, new MessageCompletion(id, this.messageRundown)); 
    } 

消費者

public static boolean setup() {  
    Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");  
    Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection(); 
    connection.setAlwaysSessionAsync(true);   
    return true; 
} 

public Consumer() { 
    session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
    destination = (ActiveMQDestination)session.createQueue("TEST.FOO"); 
    consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); 
    consumer.setMessageListener(this); 
    connection.start(); 
} 

// implements MessageListener 
@Override 
public void onMessage(Message message) {  
    messageQueue.add(message); 
} 
public void run() { 
    while(true) { 
     Message message = messageQueue.poll(); 
     while(message != null) { 
      // do some work    
      message.acknowledge(); 
      message = messageQueue.poll();    
     } 
     Thread.sleep(10000);    
    } 
} 

雖然不需要消費者我將它作爲參考,東西已被刪除,以確保簡潔,這是一部分工作代碼。

回答

1

您對承認方式的理解是錯誤的。發件人的異步回調僅告訴您代理已收到該消息。如果它是一個持久發送,回調將表明該消息也被寫入磁盤。

在JMS或大多數其他消息傳遞代理中,生產者和消費者沒有耦合。生產者在隊列中放置消息,然後消費者可以隨時前來並從該隊列中消費。兩者之間沒有耦合,生產者在繼續產生下一條消息之前不能等待消費者。

如果你想知道何時處理特定的消息,所以你可以扼殺工作,那麼你想看看JMS Request/Response風格的消息模式。

+0

謝謝!我現在明白了這一點,異步回調實現的JMSException處理程序現在也加起來了。因此,對於高性能,持久的消息隊列,保持異步以確保我們不會在這方面花費太多時間仍然是有意義的。 – amritanshu

+0

對於持久性消息,您無需權衡是否想知道每條消息實際上何時會保持持久,如果有任何性能提升,就會獲得很小的消息。 –