2016-11-18 113 views
1

我試圖創建2個簡單的應用程序;一個是將消息發佈到RabbitMQ頻道,另一個是從頻道接收它並將其打印出控制檯。發件人應用程序立即啓動併發布10條消息。春季集成AMQP和RabbitMQ丟失和「unacked」消息

我在客戶端控制檯上看到的只有大約一半的消息打印出來。 當我檢查RabbitMQ Web客戶端時,我還會看到其中一條消息始終處於「未定義」狀態。

當我閱讀文檔時,據我瞭解「amqp入站/出站網關」是實現此目的的一種簡單方法。 你能幫我理解爲什麼我丟失了一些消息,並且一個人坐在「未吃」狀態? 另外,我應該如何改變它以獲得另一方的所有消息?

預先感謝您。

下面是XML配置和文件上發送側:

integrationContext.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

<!-- Configuration for Component Scan --> 
<context:component-scan base-package="com.amqp.sample" /> 

<context:property-placeholder location="classpath*:rabbitmq.properties"/> 

<int:gateway id="taskGateway" service-interface="com.amqp.sample.TaskGateway" default-request-channel="processChannel" /> 
<int-amqp:channel id="processChannel" 
    connection-factory="connectionFactory" 
    message-driven="true" 
    queue-name="ha.rabbit.channel" /> 

<!-- RabbitMQ Connection Factory --> 
<rabbit:connection-factory id="connectionFactory" 
    addresses="${rabbitmq.addresses}" 
    username="${rabbitmq.username}" 
    password="${rabbitmq.password}" /> 

<rabbit:template id="amqpTemplate" 
    connection-factory="connectionFactory" 
    reply-timeout="-1" /> 

<rabbit:admin connection-factory="connectionFactory" /> 


<int-amqp:outbound-gateway request-channel="processChannel" 
          reply-channel="processChannel" 
          reply-timeout="-1" /> 

</beans> 

TaskGateway.java

import org.springframework.messaging.Message; 

import com.amqp.sample.model.Task; 

public interface TaskGateway { 

    void processTaskRequest(Message<Task> message); 
} 

Task.java

import java.io.Serializable; 

public class Task implements Serializable { 

    private static final long serialVersionUID = -2138235868650860555L; 
    private int id; 
    private String name; 

    public int getId() { 
     return id; 
    } 

    public void setId(int id) { 
     this.id = id; 
    } 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public Task(int id, String name) { 
     this.id = id; 
     this.name = name; 
    } 

    @Override 
    public String toString() { 
     return "Task [id=" + id + ", name=" + name + "]"; 
    } 

} 

Application.Java

@PropertySources({ 
@PropertySource("classpath:application.properties"), 
}) 
@EnableConfigurationProperties 
@ComponentScan 
@EnableAutoConfiguration 
@ImportResource("classpath:integrationContext.xml") 
public class Application extends SpringBootServletInitializer { 
    public static final Logger logger = LoggerFactory.getLogger(Application.class); 

    private static TaskGateway taskGateway; 

    public static void main(String[] args) { 
    ApplicationContext context=SpringApplication.run(Application.class, args); 

    taskGateway = context.getBean(TaskGateway.class); 
    for(int i=0; i<10; i++){ 
     Message<Task> message = MessageBuilder.withPayload(getTask(i)).build(); 
     taskGateway.processTaskRequest(message); 
    } 
    } 

    /** 
    * Creates a sample task returns. 
    * 
    * @return Task 
    */ 
    private static Task getTask(final int id) { 
     return new Task(id, "Task with ID:" + id); 
    } 

    @Override 
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { 
    return application.sources(Application.class); 
    } 

} 

而且,這裏有對接收方的文件:

integrationContext.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

    <!-- Configuration for Component Scan --> 
    <context:component-scan base-package="com.amqp.sample" /> 

    <context:property-placeholder location="classpath*:rabbitmq.properties"/> 

    <!-- RabbitMQ Connection Factory --> 
    <rabbit:connection-factory id="connectionFactory" 
     addresses="${rabbitmq.addresses}" 
     username="${rabbitmq.username}" 
     password="${rabbitmq.password}" /> 

    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> 
    <rabbit:admin connection-factory="connectionFactory" /> 

    <int:channel id="inputChannel"/> 

    <int-amqp:inbound-gateway request-channel="inputChannel" reply-channel="inputChannel" 
    queue-names="ha.rabbit.channel" 
    connection-factory="connectionFactory" 
    amqp-template="amqpTemplate"/> 

    <int:service-activator input-channel="inputChannel" ref="taskProcessService" method="process" /> 

</beans> 

ProcessService.java

import org.springframework.messaging.Message; 

public interface ProcessService<T> { 

    /** 
    * Processes incoming message(s) 
    * 
    * @param message SI Message. 
    */ 
    void process(Message<T> message); 

} 

TaskProcessService

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.messaging.Message; 
import org.springframework.stereotype.Component; 

import com.amqp.sample.model.Task; 

@Component("taskProcessService") 
public class TaskProcessService implements ProcessService<Task> { 

    private final Logger logger = LoggerFactory.getLogger(TaskProcessService.class); 

    @Override 
    public void process(Message<Task> message) { 

    logger.info("Received Message : " + message.getPayload()); 
    } 

} 

Application.java

@PropertySources({ 
@PropertySource("classpath:application.properties"), 
}) 
@EnableConfigurationProperties 
@ComponentScan 
@EnableAutoConfiguration 
@ImportResource("classpath:integrationContext.xml") 
public class Application extends SpringBootServletInitializer { 
    public static final Logger logger = LoggerFactory.getLogger(Application.class); 


    public static void main(String[] args) { 
    ApplicationContext context = SpringApplication.run(Application.class, args); 
    } 

    @Override 
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { 
    return application.sources(Application.class); 
    } 

} 

回答

0

首先,網關用於請求/回覆場景;由於您的客戶預計沒有響應,並且該服務沒有返回,您應該使用通道適配器而不是網關。嘗試一下,如果你仍然遇到麻煩,請回來。

編輯

@SpringBootApplication 
@IntegrationComponentScan 
public class So40680673Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = SpringApplication.run(So40680673Application.class, args); 
     FooGate gate = context.getBean(FooGate.class); 
     for (int i = 0; i < 10; i++) { 
      System.out.println(gate.exchange("foo" + i)); 
     } 
     context.close(); 
    } 

    @MessagingGateway(defaultRequestChannel = "out.input") 
    public interface FooGate { 

     String exchange(String out); 
    } 

    @Bean 
    public IntegrationFlow out(AmqpTemplate amqpTemplate) { 
     return f -> f.handle(Amqp.outboundGateway(amqpTemplate).routingKey(queue().getName())); 
    } 

    @Bean 
    public IntegrationFlow in(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, queue().getName())) 
       .<String, String>transform(String::toUpperCase) 
       .get(); 
    } 

    @Bean 
    public Queue queue() { 
     return new AnonymousQueue(); 
    } 

} 
+0

謝謝你的回覆,加里。實際上,我打算稍後記錄發送方收到的消息。這是我選擇網關的原因之一。你能幫我解決網關的代碼嗎? – turgos

+0

客戶端的回覆超時時間爲-1('RabbitTemplate');你的調用線程將永遠等待一個永遠不會到來的答覆。一旦你解決了這個問題,我建議你打開DEBUG日誌記錄,並遵循客戶端和服務器中的消息。我不準備幫助調試僞造配置;抱歉。 –

+0

我明白了。我將更改回復超時並從那裏開始調試。同時,你是否有任何簡單的集成-amqp與rabbitmq示例代碼/配置使用網關,我可以用它作爲示例?再次感謝你的幫助。 – turgos