我希望能夠使用Spring Integration並行處理消息。這些消息來自多個設備,我們需要按順序處理來自同一設備的消息,但設備可以在多個線程中處理。可能有成千上萬的設備,所以我試圖找出如何儘可能使用Spring Integration的語義來基於設備ID的mod來分配處理器。我應該看什麼方法?彈簧集成消息處理按標頭信息分區
1
A
回答
1
在不知道其他需求(事務語義等)的情況下很難推廣,但最簡單的方法可能是路由器使用設備ID上的某種散列算法向多個QueueChannel
發送消息(因此所有消息特定設備轉到相同的頻道)。
然後,讓一個單線程輪詢器從每個隊列中提取消息。
編輯:(應答評論)
再次,很難一概而論,但是......
見AbstractMessageRouter.determineTargetChannels()
- 路由器的實際返回物理信道對象(實際上是一個名單,但在大多數案例列表1)。所以,是的,您可以編程創建QueueChannel
,並根據消息讓路由器返回相應的路由。
假設您希望所有消息都由相同的下游流處理,您還需要爲每個隊列通道創建一個<bridge/>
,以將其橋接到流中下一個組件的輸入通道。
- 創建
QueueChannel
- 創建
BridgeHandler
(設置outputChannel
到下一個組件的輸入通道) - 創建
PollingConsumer
(構造函數將信道和處理程序;設置trigger
等)
start()消費者。
所有這些都可以在您的自定義路由器初始化中完成,並實現determineTargetChannels()
來選擇隊列。
根據事件的處理時間,我通常會建議在輪詢器線程上運行下游流,而不是設置taskExecutor
以避免下一輪輪詢嘗試在完成此任務之前計劃其他任務時出現問題。您可能需要增加默認的taskScheduler
的池大小。
相關問題
- 1. 禁用消息歷史彈簧集成
- 2. TcpConnectionCloseEvent後的彈簧集成消息
- 3. 彈簧集成:接收來自多個JMS目標的消息
- 4. Apache Kafka主題分區消息處理
- 5. Spring批處理/集成 - 接收消息
- 6. Spring集成消息處理程序:ActiveMQTextMessage
- 7. Freemarker缺少彈簧消息
- 8. 改變彈簧MissingServletRequestPartException消息
- 9. 如何將彈簧集成消息頭保存到數據庫中
- 10. 如何在彈簧集成組件之間傳遞信息?
- 11. 通道中的消息異步推送(彈簧集成)
- 12. 彈簧集成:無法將消息發送到通道
- 13. 彈簧集成:每條消息有多個轉換器
- 14. 彈簧集成中的消息類型安全DSL
- 15. 彈簧集成中的錯誤處理
- 16. 彈簧集成中的錯誤處理
- 17. 彈簧集成隊列錯誤處理
- 18. 使用ActiveMQ的,彈簧處理的消息
- 19. 處理消息
- 20. 彈簧數據redis消息偵聽器偵聽消息兩次
- 21. 彈簧啓動異常詳細信息
- 22. 彈簧雲流默認自定義消息頭
- 23. 與嵌套json彈簧消費休息
- 24. 配置彈簧數據錯誤消息
- 25. GWT:處理消息?
- 26. 處理Windows消息
- 27. Firemonkey - 消息處理
- 28. JSP處理消息
- 29. IOT集線器消息處理器
- 30. JS處理「輸入」按鈕信息
謝謝 - 我試圖弄清楚的一件事情是我們如何做「QueueChannels數量」並讓路由器識別頻道。從我看到的情況來看,映射通常僅由通道ID標識。我是否必須爲每個設備組配置一個通道,或者有沒有一種以編程方式生成通道的方法? – samyem 2014-11-05 15:51:00
更新了答案。 – 2014-11-05 16:14:51