2014-11-05 105 views
1

我希望能夠使用Spring Integration並行處理消息。這些消息來自多個設備,我們需要按順序處理來自同一設備的消息,但設備可以在多個線程中處理。可能有成千上萬的設備,所以我試圖找出如何儘可能使用Spring Integration的語義來基於設備ID的mod來分配處理器。我應該看什麼方法?彈簧集成消息處理按標頭信息分區

回答

1

在不知道其他需求(事務語義等)的情況下很難推廣,但最簡單的方法可能是路由器使用設備ID上的某種散列算法向多個QueueChannel發送消息(因此所有消息特定設備轉到相同的頻道)。

然後,讓一個單線程輪詢器從每個隊列中提取消息。

編輯:(應答評論)

再次,很難一概而論,但是......

AbstractMessageRouter.determineTargetChannels() - 路由器的實際返回物理信道對象(實際上是一個名單,但在大多數案例列表1)。所以,是的,您可以編程創建QueueChannel,並根據消息讓路由器返回相應的路由。

假設您希望所有消息都由相同的下游流處理,您還需要爲每個隊列通道創建一個<bridge/>,以將其橋接到流中下一個組件的輸入通道。

  • 創建QueueChannel
  • 創建BridgeHandler(設置outputChannel到下一個組件的輸入通道)
  • 創建PollingConsumer(構造函數將信道和處理程序;設置trigger等)

start()消費者。

所有這些都可以在您的自定義路由器初始化中完成,並實現determineTargetChannels()來選擇隊列。

根據事件的處理時間,我通常會建議在輪詢器線程上運行下游流,而不是設置taskExecutor以避免下一輪輪詢嘗試在完成此任務之前計劃其他任務時出現問題。您可能需要增加默認的taskScheduler的池大小。

+0

謝謝 - 我試圖弄清楚的一件事情是我們如何做「QueueChannels數量」並讓路由器識別頻道。從我看到的情況來看,映射通常僅由通道ID標識。我是否必須爲每個設備組配置一個通道,或者有沒有一種以編程方式生成通道的方法? – samyem 2014-11-05 15:51:00

+0

更新了答案。 – 2014-11-05 16:14:51