我正在通過STOMP與ActiveMQ交互。我有一個發佈消息的過程和一個訂閱和處理消息的多個過程(大約10個並行實例)。使用STOMP從ActiveMQ隊列中讀取非阻塞事務
閱讀消息後,我想確定如果出於某種原因,我的應用程序失敗/崩潰,消息不會丟失。自然,我轉向交易。不幸的是,我發現一旦消費者讀取消息作爲交易的一部分,所有以下消息都不會被髮送給其他消費者,直到交易結束。
測試案例:abc
隊列有100條消息。如果我在兩個不同的瀏覽器選項卡中激活以下代碼,第一個將在10秒內返回,第二個將在20秒內返回。
<?php
// Reader.php
$con = new Stomp("tcp://localhost:61613");
$con->connect();
$con->subscribe(
"/queue/abc",
array()
);
$tx = "tx3".microtime();
echo "TX:$tx<BR>";
$con->begin($tx);
$messages = array();
for ($i = 0; $i < 10; $i++) {
$t = microtime(true);
$msg = $con->readFrame();
if (!$msg) {
die("FAILED!");
}
$t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>";
array_push($messages, $msg);
$con->ack($msg, $tx);
sleep(1);
}
$con->abort($tx);
有什麼我缺少代碼明智嗎?是否有配置ActiveMQ(或發送頭文件)的方法,可以使事務從隊列中刪除項目,允許其他進程使用其他消息,並且如果事務失敗或超時,將把項目恢復到? PS:我想過爲每個閱讀過程創建另一個隊列--DetentionQueue,但我真的寧願不這樣做,如果我有選擇。
我的特定ActiveMQ版本的默認預取大小爲1(根據AvctiveMQ控制檯進行檢查)。可以肯定的是,我檢查了你的建議(使用activemq.prefetchSize = 1),但沒有改變結果。 但是,謝謝,無論如何:) – ygilad 2013-03-02 00:37:53