2017-04-13 152 views
3

我有要求將消息從一個ActiveMQ實例上的隊列移動到另一個ActiveMQ實例。有沒有辦法使用彈簧引導配置連接到兩個不同的ActiveMQ實例?spring boot配置多個ActiveMQ實例

我是否需要創建多個connectionFactories?如果是這樣,那麼JmsTemplate如何知道連接到哪個ActiveMQ實例?

@Bean 
    public ConnectionFactory connectionFactory() { 
     return new ActiveMQConnectionFactory(JMS_BROKER_URL); 
    } 

任何幫助和代碼示例都會很有用。

在此先感謝。 GM

回答

5

除了@Chris的響應 您必須使用不同的端口創建不同的BrokerService實例,並創建不同的ConnectionFactory以連接到每個代理,並使用這些不同的工廠創建不同的JmsTemplate向不同的代理髮送消息。

例如:

import javax.jms.ConnectionFactory; 
import javax.jms.QueueConnectionFactory; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.broker.BrokerService; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.Primary; 
import org.springframework.jms.config.DefaultJmsListenerContainerFactory; 
import org.springframework.jms.config.JmsListenerContainerFactory; 
import org.springframework.jms.core.JmsTemplate; 

@Configuration 
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward { 
    public static final String LOCAL_Q = "localQ"; 
    public static final String REMOTE_Q = "remoteQ"; 

    @Bean 
    public BrokerService broker() throws Exception { 
     final BrokerService broker = new BrokerService(); 
     broker.addConnector("tcp://localhost:5671"); 
     broker.setBrokerName("broker"); 
     broker.setUseJmx(false); 
     return broker; 
    } 

    @Bean 
    public BrokerService broker2() throws Exception { 
     final BrokerService broker = new BrokerService(); 
     broker.addConnector("tcp://localhost:5672"); 
     broker.setBrokerName("broker2"); 
     broker.setUseJmx(false); 
     return broker; 
    } 

    @Bean 
    @Primary 
    public ConnectionFactory jmsConnectionFactory() { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671"); 
     return connectionFactory; 
    } 

    @Bean 
    public QueueConnectionFactory jmsConnectionFactory2() { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672"); 
     return connectionFactory; 
    } 

    @Bean 
    @Primary 
    public JmsTemplate jmsTemplate() { 
     JmsTemplate jmsTemplate = new JmsTemplate(); 
     jmsTemplate.setConnectionFactory(jmsConnectionFactory()); 
     jmsTemplate.setDefaultDestinationName(LOCAL_Q); 
     return jmsTemplate; 
    } 

    @Bean 
    public JmsTemplate jmsTemplate2() { 
     JmsTemplate jmsTemplate = new JmsTemplate(); 
     jmsTemplate.setConnectionFactory(jmsConnectionFactory2()); 
     jmsTemplate.setDefaultDestinationName(REMOTE_Q); 
     return jmsTemplate; 
    } 

    @Bean 
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory, 
      DefaultJmsListenerContainerFactoryConfigurer configurer) { 
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     configurer.configure(factory, connectionFactory); 
     return factory; 
    } 

    @Bean 
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
      @Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory, 
      DefaultJmsListenerContainerFactoryConfigurer configurer) { 
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     configurer.configure(factory, connectionFactory); 
     return factory; 
    } 
} 

從一個AMQ例如將郵件移動到另一個實例可以使用JmsBridgeConnectors

注意,通過下面你的例子不能有多個消費者從隊列您希望轉發郵件,因爲Camel或JmsBridgeConnectors會使用該郵件並將其轉發。如果您希望轉發郵件的唯一副本,您可以使用以下解決方案: 1-將您的隊列轉換爲主題,通過持久訂閱或追溯使用者管理離線消費者的郵件。 2-將您的隊列轉換爲複合隊列並使用DestinationsInterceptors將消息複製到另一個隊列。 3-使用NetworkConnector爲Networkof經紀人

@Bean 
public BrokerService broker() throws Exception { 
    final BrokerService broker = new BrokerService(); 
    broker.addConnector("tcp://localhost:5671"); 
    SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector(); 
    OutboundQueueBridge bridge = new OutboundQueueBridge(); 
    bridge.setLocalQueueName(LOCAL_Q); 
    bridge.setOutboundQueueName(REMOTE_Q); 
    OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge }; 
    simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE); 
    simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges); 
    simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory()); 
    simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2()); 
    JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector }; 
    broker.setJmsBridgeConnectors(jmsConnectors); 
    broker.setBrokerName("broker"); 
    broker.setUseJmx(false); 
    return broker; 
} 

或使用駱駝像這樣如下:

@Bean 
public CamelContext camelContext() throws Exception { 
    CamelContext context = new DefaultCamelContext(); 
    context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671")); 
    context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672")); 
    context.addRoutes(new RouteBuilder() { 
     public void configure() { 
      from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q); 
     } 
    }); 
    context.start(); 
    return context; 
} 

您的生產者必須是這樣的使用型動物JmsTemplates:

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.boot.CommandLineRunner; 
import org.springframework.jms.core.JmsTemplate; 
import org.springframework.stereotype.Component; 

@Component 
public class Producer implements CommandLineRunner { 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    @Autowired 
    @Qualifier("jmsTemplate2") 
    private JmsTemplate jmsTemplate2; 

    @Override 
    public void run(String... args) throws Exception { 
     send("Sample message"); 
    } 

    public void send(String msg) { 
     this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg); 
     this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg); 
    } 
} 

and Consumer:

import javax.jms.Session; 

import org.apache.activemq.ActiveMQSession; 
import org.springframework.jms.annotation.JmsListener; 
import org.springframework.stereotype.Component; 

@Component 
public class Consumer { 

    @JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2") 
    public void receiveQueue(Session session, String text) { 
     System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo()); 
     System.out.println(text); 
    } 
} 
+0

感謝這個詳細的答案,它給了我一個很好的開始。我設法讓它工作。我喜歡駱駝解決方案......它很好很簡單。謝謝你。 – user2279337

+0

@Hassen Bennour https://stackoverflow.com/questions/48582760/how-to-send-messages-to-multiple-active-mq-brokers-from-the-same-application我有你的解決方案的實施問題 – gstackoverflow

2

您將需要實例化多個JmsTemplate實例作爲Beans在你的應用程序,然後使用@Qualifier@Primary註釋的組合來指示JmsTemplate實例應該去的地方。

例如

@Bean("queue1") 
@Primary 
public JmsTemplate getQueue1(@Qualifier("connectionFactory1")ConnectionFactory factory...){ 
... 
} 

@Bean("queue2") 
@Primary 
public JmsTemplate getQueue2(@Qualifier("connectionFactory2")ConnectionFactory factory...){ 
... 
} 

... 

@Autowired 
@Qualifier("queue1") 
private JmsTemplate queue1; 
... 

更多信息,請參見here

+0

這給了我一個良好的開端。再次感謝。我還使用了來自hassen-bennour的示例代碼,擴展了您的解決方案。 – user2279337