2016-01-21 65 views
0

對於請求/響應,我需要一個臨時隊列來應答。我想創建一個隊列並始終保持打開狀態(而不是使用SessionCallback.doInJms()爲每個請求創建一個新的隊列)。如何在Spring中創建臨時JMS隊列?

我如何用Spring的JMS支持來做到這一點?

回答

1

我找不到辦法做到這一點,所以我創建了一個解決方法。這個類將保持會話和連接打開,直到上下文被銷燬。這樣,你可以確定你會得到每一個答覆。其他代碼通常會發送消息,打開回復隊列,然後有時會看不到回覆,因爲它是在發件人打開回復隊列之前發送的。

用法:

@Bean 
public JmsTemplate replyJmsTemplate() { 
    JmsTemplate result = new JmsTemplate(jmsConnectionFactory()); 
    result.setDefaultDestination(replyQueueProvider().getQueue()); 
    result.setReceiveTimeout(10000); 
    return result; 
} 

@Bean 
public QueueProvider replyQueueProvider() { 
    QueueProvider result = new QueueProvider(jmsConnectionFactory()); 
    result.init(); // Must call manually; no @PostConstruct! 
    return result; 
} 

實現:

import java.util.concurrent.atomic.AtomicInteger; 

import javax.annotation.PreDestroy; 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Queue; 
import javax.jms.Session; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.jms.UncategorizedJmsException; 
import org.springframework.jms.support.JmsUtils; 

public class QueueProvider { 

    private static final Logger log = LoggerFactory.getLogger(QueueProvider.class); 

    private static final AtomicInteger COUNT = new AtomicInteger(); 

    private final ConnectionFactory connectionFactory; 
    private String queueName; 
    private boolean isTemporary; 
    private Connection connection; 
    private Session session; 
    private Queue queue; 
    private boolean transacted; 
    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 

    public QueueProvider(ConnectionFactory connectionFactory, String queueName) { 
     this.connectionFactory = connectionFactory; 
     this.queueName = queueName; 
    } 

    public QueueProvider(ConnectionFactory connectionFactory) { 
     this.connectionFactory = connectionFactory; 
     this.isTemporary = true; 
     this.queueName = "TemporaryQueue-" + COUNT.incrementAndGet(); 
    } 

    public void setTransacted(boolean transacted) { 
     this.transacted = transacted; 
    } 

    public boolean getTransacted() { 
     return transacted; 
    } 

    public void setAcknowledgeMode(int acknowledgeMode) { 
     this.acknowledgeMode = acknowledgeMode; 
    } 

    public int getAcknowledgeMode() { 
     return acknowledgeMode; 
    } 

    public void init() { 
     try { 
      connection = connectionFactory.createConnection(); 
      connection.start(); 

      session = connection.createSession(transacted, acknowledgeMode); 

      log.debug("Opening queue {}", queueName); 
      if (isTemporary) { 
       queue = session.createTemporaryQueue(); 
      } else { 
       queue = session.createQueue(queueName); 
      } 
     } catch(Exception e) { 
      throw new UncategorizedJmsException("Error creating queue " + queueName, e); 
     } 
    } 

    @PreDestroy 
    public void close() { 
     log.debug("Closing queue {}", queueName); 
     queue = null; 
     JmsUtils.closeSession(session); 
     JmsUtils.closeConnection(connection); 
    } 

    public Queue getQueue() { 
     if(null == queue) { 
      throw new IllegalStateException("Either init() wasn't called or close() was already called"); 
     } 
     return queue; 
    } 
}