2012-04-27 95 views
4

我正在爲我們的一個應用程序開發消息接口。應用程序是一種服務,它被設計爲接受「作業」,做一些處理,並返回結果(實際上是以文件的形式)。使用Spring與RabbitMQ集成

這個想法是使用RabbitMQ作爲消息傳遞基礎結構和Spring AMQP來處理協議特定的細節。

我不希望我的代碼與Spring AMQP緊密耦合,所以我想使用Spring Integration來隱藏消息傳遞API。所以基本上我想這樣的:發送到RabbitMQ的

消息====>春季AMQP ====> Spring集成====>爲MyService ====>回覆的RabbitMQ

一路我正在嘗試解決將這些連接在一起所需的XML配置,但我在抽象層次和術語方面存在問題。在Spring AMQP/RabbitMQ之上找到一個演示Spring集成的工作示例,事實證明這是非常困難的,儘管這種設置對我來說是非常「最佳實踐」的。

1)那麼..那麼一些聰明的靈魂在那裏可以快速瀏覽一下,也許可以將我推向正確的方向?我需要什麼?我不需要什麼? :-)

2)理想情況下,隊列應該是多線程的,這意味着taskExecutor應該將多個消息交給我的jobService進行並行處理。需要什麼配置?

<?xml version="1.0" encoding="UTF-8"?> 
    <beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd 
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd 
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd 
    "> 

    <context:component-scan base-package="com.myprogram.etc" /> 

    <!-- Messaging infrastructure: RabbitMQ --> 

    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> 
     <constructor-arg value="${ei.messaging.amqp.servername}" /> 
     <property name="username" value="${ei.messaging.amqp.username}" /> 
     <property name="password" value="${ei.messaging.amqp.password}" /> 
    </bean> 

    <rabbit:connection-factory id="connectionFactory" /> 

    <rabbit:admin connection-factory="connectionFactory"/> 

    <!-- From RabbitMQ --> 

    <int-amqp:inbound-gateway request-channel="fromAMQP" reply-channel="toAMQP" queue-names="our-product-name-queue" connection-factory="connectionFactory"/> 

    <!-- Spring Integration configuration --> 

    <int:channel id="fromAMQP"> 
     <!-- Is this necessary?? --> 
     <int:queue/> 
    </int:channel> 

    <!-- JobService is a @Service with a @ServiceActivator annotation --> 
    <int:service-activator input-channel="fromAMQP" ref="jobService"/> 
</beans> 

回答

6

我儘可能多的去春來,集成和彈簧一體化,AMQP作爲你一個小白,我懷疑,但我沒有得到的東西的工作基於以下一個樣本項目的一部分。

對於RabbitMQ的基礎設施,我有以下幾點:

<rabbit:connection-factory id="rabbitConnectionFactory"/> 

<rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/> 

<rabbit:admin connection-factory="rabbitConnectionFactory"/> 

<!-- some attributes seemed to be ok with queue name, others required id 
    -- so I used both with the same value --> 
<rabbit:queue id='test.queue' name='test.queue'/> 

<rabbit:direct-exchange name:"my.exchange"> 
    <rabbit:bindings> 
     <rabbit:binding queue="test.queue" key="test.binding"/> 
    </rabbit:bindings> 
</rabbit:direct-exchange> 

要發送消息到RabbitMQ的,我有以下幾點:

<!-- This is just an interface definition, no implementation required 
    -- spring will generate an implementation which puts a message on the channel --> 
<int:gateway id="backgroundService", 
     service-interface="com.company.BackgroundService" 
      default-request-channel="toRabbit" 

<int:channel id:"toRabbit"/> 

<!-- used amqpTemplate to send messages on toRabbit channel to rabbitmq --> 
<int-amqp:outbound-channel-adapter channel:"toRabbit" 
           amqp-template="amqpTemplate" 
        exchange-name="my.exchange" 
        routing-key="test.binding"/> 

和接收消息我有以下幾點:

<int:service-activator input-channel="fromRabbit" 
         ref="testService" 
         method="serviceMethod"/> 


// from rabbitmq to local channel 
<int-amqp:inbound-channel-adapter channel="fromRabbit" 
            queue-names="test.queue" 
            connection-factory="rabbitConnectionFactory"/> 

<int:channel id="fromRabbit"/> 

一些注意事項 - amqp集成在spring-integration中的文檔說它是可能的e來同步發送和接收返回值,但我還沒有弄清楚。當我的service-activator方法返回一個值時,它會引發異常,並將消息放回rabbitmq(並生成一個無限循環,因爲它會再次接收消息並再次拋出異常)。

我BackgroundService interfacde看起來是這樣的:

package com.company 

import org.springframework.integration.annotation.Gateway 

public interface BackgroundService { 

    //@Gateway(requestChannel="someOtherMessageChannel") 
    public String sayHello(String toWho) 

} 

您可以通過註釋指定每個方法的通道,如果你不希望使用的春天豆配置的默認通道。

連接到服務激活的服務是這樣的:

package com.company; 

class TestService { 

    public void serviceMethod(String param) { 
    log.info("serviceMethod received: " + param"); 
    //return "hello, " + param; 
    } 
} 

當我擁有了一切本地有線時沒有涉及的RabbitMQ,返回值是正確的主叫方接收。當我去rabbitmq頻道,當返回一個值後拋出一個異常時,我得到了上述的無限循環。這肯定是可能的,否則它將不可能在不修改代碼的情況下在不同的通道中進行連線,但我不確定這是什麼技巧。如果你想出來,請回答一個解決方案。顯然,您可以根據需要在端點之間放置任何路由,轉換和過濾。

不要驚訝,如果我的XML摘錄上面有錯別字。我不得不從groovy DSL轉換回xml,所以我可能犯了錯誤。但意圖應該足夠清楚。

+0

我注意到咖啡樣本也有一個amqp-enabled配置,並從中找出了雙向(返回值)通信。而不是配置入站通道適配器和出站通道適配器,請配置int-amqp:入站網關和int-amqp:出站網關。您可以更改標籤名稱,並將上例中的所有屬性保留在上面(但將「通道」重命名爲「請求通道」)。你顯然可以指定一個特定的回覆通道,而不是使用默認的,但是當我嘗試這個時,我得到了一個錯誤。 – ideasculptor 2012-04-30 21:41:31

+0

嗨,你能使它工作嗎?我有類似的問題http://stackoverflow.com/questions/24037271/spring-batch-remote-chunking-doesnt-queue-messages-but-runs-it-locally – vishal 2014-06-06 19:42:40