我想了解聚合器的基礎知識。下面是使用情況下,我想實現:從隊列無法讓聚合器工作
1)讀消息(訂單詳細信息)。
<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
<orderItem>
<isbn>12333454443</isbn>
<quantity>4</quantity>
</orderItem>
<orderItem>
<isbn>545656777</isbn>
<quantity>50</quantity>
</orderItem>
..
..
</order>
一個順序消息將包含多個OrderItem的。我們可以預期在隊列中有數百個訂單消息。
2)結束結果::
a)每個OrderItem的應寫入到文件中。
b)中這樣的文件應該被寫入唯一的文件夾。
舉個例子,假設我們有兩個爲了消息 - 每個包含三個OrderItem的。
因此,我們需要創建2個文件夾:
在 「文件夾1」,應該有4個文件(1個OrderItem的在每個文件)
在 「文件夾2」,應該有2文件(每個文件中有1 orderitem)。這裏爲簡單起見,我們假設不再有訂單消息來了,我們可以在5分鐘後寫入。
實現:
- 我能夠讀取從隊列(的WebSphere MQ)的消息,併成功解組消息。
- 用於分流分裂基於OrderItem的計數數量的消息。
- 二手聚集到組中的4
我無法大小的消息得到聚合器按我的理解而努力。
- 我推一個順序當4 OrderItem的,該消息被正確地得到聚合。
- 我推一個訂單與5 orderitem,第4個是聚合,但最後一個被髮送到丟棄渠道。這是因爲MessageGroup被釋放,所以最後的消息被丟棄。
- 我推了兩個訂單,每個訂單包含2 orderitem。最後的2 orderitem被髮送到放棄渠道。
關聯策略是硬編碼的(OrderAggregator。Java),但上述情況應該已經奏效。
需要關於如何實現這種使用情況的指針,我可以將它們組合到4中並寫入唯一文件夾。 請注意,orderitem都是獨立的書籍訂單,它們之間沒有任何關係。
下面是配置。
彈簧bean.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
<int:channel id="mqInbound"/>
<int:channel id="item"/>
<int:channel id="itemList"/>
<int:channel id="aggregatorDiscardChannel"/>
<int-jms:message-driven-channel-adapter id="jmsIn"
channel="mqInbound"
destination="requestQueue"
message- converter="orderMessageConverter"/>
<int:splitter input-channel="mqInbound" output-channel="item" expression="payload.orderItem"/>
<int:chain id="aggregateList" input-channel="item" output-channel="itemList" >
<int:header-enricher>
<int:header name="sequenceSize" expression="4" overwrite="true"/>
</int:header-enricher>
<int:aggregator correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders" discard-channel="aggregatorDiscardChannel" />
</int:chain>
<int:service-activator input-channel="itemList" ref="displayAggregatedList" method="display"/>
<int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
<bean id="orderAggregator" class="com.samples.Aggregator.OrderAggregator"/>
<bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
...
....
</beans>
OrderAggregator.java
public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders(OrderItemType orderItemType) {
return "items";
}
}
DisplayAggregatedList.java
public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
System.out.println("######## Display Aggregated ##############");
for(OrderItemType oit : orderItemTypeList) {
System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
}
}
public void displayDiscarded(Message<?> message) {
System.out.println("######## Display Discarded ##############" + message);
}
}
感謝@Artem Bilan爲您的迴應。聚合器似乎對我的代碼工作不一致。根據我推送的消息數量,有時可以使用。 但是,下面的消息序列是**不工作**。 ** 1)**訂購3條訂單消息。 (isbn#:001,002,003) ** 2)** 1訂單項的訂單郵件。 (isbn#:004) 消息應在第二條消息後發佈。然而,第二條消息(Isbn:004)在**丟棄通道**中可見。 –
下面的scenerio作品: ** 1)** 5訂單項的訂單消息。 (isbn#:011,012,013,014,015) ** 2)** 5訂單項目的訂單消息。 (isbn#:016,017,018,019,020) ** 3)**具有2個訂單項的訂單消息。 (isbn#:021,022) **發佈了三個消息組**。組1(011,012,013,014),組2(015,016,017,018),組3(019,020,021,022)。 但是,由於某種原因,在我上次評論中共享的較早序列(3 orderitem + 1 orderitem)不起作用 –
請使用'expire-groups-upon-completion =「true」並考慮在發佈時使用'MessageCountReleaseStrategy' -strategy' –