我試圖創建2個簡單的應用程序;一個是將消息發佈到RabbitMQ頻道,另一個是從頻道接收它並將其打印出控制檯。發件人應用程序立即啓動併發布10條消息。春季集成AMQP和RabbitMQ丟失和「unacked」消息
我在客戶端控制檯上看到的只有大約一半的消息打印出來。 當我檢查RabbitMQ Web客戶端時,我還會看到其中一條消息始終處於「未定義」狀態。
當我閱讀文檔時,據我瞭解「amqp入站/出站網關」是實現此目的的一種簡單方法。 你能幫我理解爲什麼我丟失了一些消息,並且一個人坐在「未吃」狀態? 另外,我應該如何改變它以獲得另一方的所有消息?
預先感謝您。
下面是XML配置和文件上發送側:
integrationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- Configuration for Component Scan -->
<context:component-scan base-package="com.amqp.sample" />
<context:property-placeholder location="classpath*:rabbitmq.properties"/>
<int:gateway id="taskGateway" service-interface="com.amqp.sample.TaskGateway" default-request-channel="processChannel" />
<int-amqp:channel id="processChannel"
connection-factory="connectionFactory"
message-driven="true"
queue-name="ha.rabbit.channel" />
<!-- RabbitMQ Connection Factory -->
<rabbit:connection-factory id="connectionFactory"
addresses="${rabbitmq.addresses}"
username="${rabbitmq.username}"
password="${rabbitmq.password}" />
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<int-amqp:outbound-gateway request-channel="processChannel"
reply-channel="processChannel"
reply-timeout="-1" />
</beans>
TaskGateway.java
import org.springframework.messaging.Message;
import com.amqp.sample.model.Task;
public interface TaskGateway {
void processTaskRequest(Message<Task> message);
}
Task.java
import java.io.Serializable;
public class Task implements Serializable {
private static final long serialVersionUID = -2138235868650860555L;
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Task(int id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Task [id=" + id + ", name=" + name + "]";
}
}
Application.Java
@PropertySources({
@PropertySource("classpath:application.properties"),
})
@EnableConfigurationProperties
@ComponentScan
@EnableAutoConfiguration
@ImportResource("classpath:integrationContext.xml")
public class Application extends SpringBootServletInitializer {
public static final Logger logger = LoggerFactory.getLogger(Application.class);
private static TaskGateway taskGateway;
public static void main(String[] args) {
ApplicationContext context=SpringApplication.run(Application.class, args);
taskGateway = context.getBean(TaskGateway.class);
for(int i=0; i<10; i++){
Message<Task> message = MessageBuilder.withPayload(getTask(i)).build();
taskGateway.processTaskRequest(message);
}
}
/**
* Creates a sample task returns.
*
* @return Task
*/
private static Task getTask(final int id) {
return new Task(id, "Task with ID:" + id);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}
}
而且,這裏有對接收方的文件:
integrationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- Configuration for Component Scan -->
<context:component-scan base-package="com.amqp.sample" />
<context:property-placeholder location="classpath*:rabbitmq.properties"/>
<!-- RabbitMQ Connection Factory -->
<rabbit:connection-factory id="connectionFactory"
addresses="${rabbitmq.addresses}"
username="${rabbitmq.username}"
password="${rabbitmq.password}" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory" />
<int:channel id="inputChannel"/>
<int-amqp:inbound-gateway request-channel="inputChannel" reply-channel="inputChannel"
queue-names="ha.rabbit.channel"
connection-factory="connectionFactory"
amqp-template="amqpTemplate"/>
<int:service-activator input-channel="inputChannel" ref="taskProcessService" method="process" />
</beans>
ProcessService.java
import org.springframework.messaging.Message;
public interface ProcessService<T> {
/**
* Processes incoming message(s)
*
* @param message SI Message.
*/
void process(Message<T> message);
}
TaskProcessService
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import com.amqp.sample.model.Task;
@Component("taskProcessService")
public class TaskProcessService implements ProcessService<Task> {
private final Logger logger = LoggerFactory.getLogger(TaskProcessService.class);
@Override
public void process(Message<Task> message) {
logger.info("Received Message : " + message.getPayload());
}
}
Application.java
@PropertySources({
@PropertySource("classpath:application.properties"),
})
@EnableConfigurationProperties
@ComponentScan
@EnableAutoConfiguration
@ImportResource("classpath:integrationContext.xml")
public class Application extends SpringBootServletInitializer {
public static final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(Application.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}
}
謝謝你的回覆,加里。實際上,我打算稍後記錄發送方收到的消息。這是我選擇網關的原因之一。你能幫我解決網關的代碼嗎? – turgos
客戶端的回覆超時時間爲-1('RabbitTemplate');你的調用線程將永遠等待一個永遠不會到來的答覆。一旦你解決了這個問題,我建議你打開DEBUG日誌記錄,並遵循客戶端和服務器中的消息。我不準備幫助調試僞造配置;抱歉。 –
我明白了。我將更改回復超時並從那裏開始調試。同時,你是否有任何簡單的集成-amqp與rabbitmq示例代碼/配置使用網關,我可以用它作爲示例?再次感謝你的幫助。 – turgos