2016-07-29 62 views
2

我想了解聚合器的基礎知識。下面是使用情況下,我想實現:從隊列無法讓聚合器工作

1)讀消息(訂單詳細信息)。

<?xml version="1.0" encoding="UTF-8"?> 
<order xmlns="http://www.example.org/orders"> 
    <orderItem> 
    <isbn>12333454443</isbn> 
    <quantity>4</quantity> 
    </orderItem> 
    <orderItem> 
    <isbn>545656777</isbn> 
    <quantity>50</quantity> 
    </orderItem> 
.. 
.. 
</order> 

一個順序消息將包含多個OrderItem的。我們可以預期在隊列中有數百個訂單消息。

2)結束結果::

a)每個OrderItem的應寫入到文件中。

b)中這樣的文件應該被寫入唯一的文件夾。

舉個例子,假設我們有兩個爲了消息 - 每個包含三個OrderItem的

因此,我們需要創建2個文件夾:

在 「文件夾1」,應該有4個文件(1個OrderItem的在每個文件)

在 「文件夾2」,應該有2文件(每個文件中有1 orderitem)。這裏爲簡單起見,我們假設不再有訂單消息來了,我們可以在5分鐘後寫入。

實現:


  1. 我能夠讀取從隊列(的WebSphere MQ)的消息,併成功解組消息。
  2. 用於分流分裂基於OrderItem的計數數量的消息。
  3. 二手聚集到組中的4

我無法大小的消息得到聚合器按我的理解而努力。

  1. 我推一個順序當4 OrderItem的,該消息被正確地得到聚合。
  2. 我推一個訂單與5 orderitem,第4個是聚合,但最後一個被髮送到丟棄渠道。這是因爲MessageGroup被釋放,所以最後的消息被丟棄。
  3. 我推了兩個訂單,每個訂單包含2 orderitem。最後的2 orderitem被髮送到放棄渠道。
    關聯策略是硬編碼的(OrderAggregator。Java),但上述情況應該已經奏效。

需要關於如何實現這種使用情況的指針,我可以將它們組合到4中並寫入唯一文件夾。 請注意,orderitem都是獨立的書籍訂單,它們之間沒有任何關係。

下面是配置。

彈簧bean.xml

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"> 
    <int:channel id="mqInbound"/> 
    <int:channel id="item"/> 
    <int:channel id="itemList"/> 
    <int:channel id="aggregatorDiscardChannel"/> 

    <int-jms:message-driven-channel-adapter id="jmsIn" 
             channel="mqInbound" 
             destination="requestQueue" 
             message- converter="orderMessageConverter"/> 

    <int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/> 

    <int:chain id="aggregateList" input-channel="item" output-channel="itemList" > 
    <int:header-enricher> 
     <int:header name="sequenceSize" expression="4" overwrite="true"/> 
    </int:header-enricher> 
    <int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" /> 
    </int:chain> 

    <int:service-activator input-channel="itemList"     ref="displayAggregatedList" method="display"/> 
    <int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/> 

    <bean id="orderAggregator"  class="com.samples.Aggregator.OrderAggregator"/> 
    <bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/> 
    ... 
    .... 
</beans> 

OrderAggregator.java

public class OrderAggregator { 

@Aggregator 
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) { 

    return orderItemTypeList; 
} 

@CorrelationStrategy 
public String groupOrders(OrderItemType orderItemType) { 

    return "items"; 
} 

} 

DisplayAggregatedList.java

public class DisplayAggregatedList { 

public void display(List <OrderItemType> orderItemTypeList) { 

    System.out.println("######## Display Aggregated ##############"); 
    for(OrderItemType oit : orderItemTypeList) { 
     System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity()); 
    } 
} 
public void displayDiscarded(Message<?> message) { 

    System.out.println("######## Display Discarded ##############" + message); 
} 
} 

回答

1

你需要什麼叫expire-groups-upon-completion

設置爲true時(默認爲false),已完成的組將從消息存儲中刪除,從而允許具有相同關聯的後續消息組成新的組。默認行爲是將與完成組具有相同關聯性的消息發送到丟棄通道。

如果你需要釋放未完成的羣體反正(2個訂單左,例如),考慮使用group-timeouthttp://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to

+0

感謝@Artem Bilan爲您的迴應。聚合器似乎對我的代碼工作不一致。根據我推送的消息數量,有時可以使用。 但是,下面的消息序列是**不工作**。 ** 1)**訂購3條訂單消息。 (isbn#:001,002,003) ** 2)** 1訂單項的訂單郵件。 (isbn#:004) 消息應在第二條消息後發佈。然而,第二條消息(Isbn:004)在**丟棄通道**中可見。 –

+0

下面的scenerio作品: ** 1)** 5訂單項的訂單消息。 (isbn#:011,012,013,014,015) ** 2)** 5訂單項目的訂單消息。 (isbn#:016,017,018,019,020) ** 3)**具有2個訂單項的訂單消息。 (isbn#:021,022) **發佈了三個消息組**。組1(011,012,013,014),組2(015,016,017,018),組3(019,020,021,022)。 但是,由於某種原因,在我上次評論中共享的較早序列(3 orderitem + 1 orderitem)不起作用 –

+0

請使用'expire-groups-upon-completion =「true」並考慮在發佈時使用'MessageCountReleaseStrategy' -strategy' –

0

請使用過期團,在完成=「true」,並考慮到使用MessageCountReleaseStrategy`發佈策略 - Artem Bilan

+0

你應該接受我的答案,而不是 –