2013-02-27 133 views
0

我正在通過STOMP與ActiveMQ交互。我有一個發佈消息的過程和一個訂閱和處理消息的多個過程(大約10個並行實例)。使用STOMP從ActiveMQ隊列中讀取非阻塞事務

閱讀消息後,我想確定如果出於某種原因,我的應用程序失敗/崩潰,消息不會丟失。自然,我轉向交易。不幸的是,我發現一旦消費者讀取消息作爲交易的一部分,所有以下消息都不會被髮送給其他消費者,直到交易結束。

測試案例:abc隊列有100條消息。如果我在兩個不同的瀏覽器選項卡中激活以下代碼,第一個將在10秒內返回,第二個將在20秒內返回。

<?php 
// Reader.php 
$con = new Stomp("tcp://localhost:61613"); 
$con->connect(); 

$con->subscribe(
    "/queue/abc", 
    array() 
); 

$tx = "tx3".microtime(); 
echo "TX:$tx<BR>"; 
$con->begin($tx); 
$messages = array(); 
for ($i = 0; $i < 10; $i++) { 
    $t = microtime(true); 
    $msg = $con->readFrame(); 
    if (!$msg) { 
     die("FAILED!"); 
    } 
    $t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>"; 
    array_push($messages, $msg); 
    $con->ack($msg, $tx); 
    sleep(1); 
} 
$con->abort($tx); 

有什麼我缺少代碼明智嗎?是否有配置ActiveMQ(或發送頭文件)的方法,可以使事務從隊列中刪除項目,允許其他進程使用其他消息,並且如果事務失敗或超時,將把項目恢復到? PS:我想過爲每個閱讀過程創建另一個隊列--DetentionQueue,但我真的寧願不這樣做,如果我有選擇。

回答

1

您可能需要調整預訂的預取大小,以便ActiveMQ在客戶端2有機會獲得任何請求之前不會將隊列上的消息發送到客戶端1。默認情況下,它被設置爲1000,以便最好地調整它以適合您的用例。

您可以通過預訂幀上的「activemq.prefetchSize = 1」標頭設置預取大小。有關所有框架選項,請參閱ActiveMQ Stomp頁面。

+0

我的特定ActiveMQ版本的默認預取大小爲1(根據AvctiveMQ控制檯進行檢查)。可以肯定的是,我檢查了你的建議(使用activemq.prefetchSize = 1),但沒有改變結果。 但是,謝謝,無論如何:) – ygilad 2013-03-02 00:37:53