2016-06-14 94 views
0

我有一個C#應用程序,它設置了多個MQ偵聽器(多個線程和可能多個服務器,每個都有自己的偵聽器)。有一些消息會從隊列中離開,我將要離開隊列,轉到MQ上的下一條消息,但是在某些情況下,我會想要重新讀取這些消息....NET IBM MQ Listener未確認消息並從隊列的開頭讀取

var connectionFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ).CreateConnectionFactory(); 
connectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, origination.Server); 
connectionFactory.SetIntProperty(XMSC.WMQ_PORT, int.Parse(origination.Port)); 
connectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, origination.QueueManager); 
connectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, origination.Channel); 

var connection = connectionFactory.CreateConnection(null, null); 
_connections.Add(connection); 

var session = connection.CreateSession(false, AcknowledgeMode.ClientAcknowledge); //changed to use ClientAcknowledge so that we will leave the message on the MQ until we're sure we're processing it 
_sessions.Add(session); 

var destination = session.CreateQueue(origination.Queue); 
_destinations.Add(destination); 

var consumer = session.CreateConsumer(destination); 
_consumers.Add(consumer); 

Logging.LogDebugMessage(Constants.ListenerStart); 

connection.Start(); 
ThreadPool.QueueUserWorkItem((o) => Receive(forOrigination, consumer)); 

然後,我有...

if (OnMQMessageReceived != null) 
{ 
    var message = consumer.Receive(); 
    var identifier = string.Empty; 

    if (message is ITextMessage) 
    { 
     //do stuff with the message here 
     //populates identifier from the message 
    } 
    else 
    { 
     //do stuff with the message here 
     //populates identifier from the message 
    } 

    if (!string.IsNullOrWhiteSpace(identifier)&& OnMQMessageReceived != null) 
    { 
     if(some check to see if we should process the message now) 
     { 
      //process message here 
      message.Acknowledge(); //this really pulls it off of the MQ 

      //here is where I want to trigger the next read to be from the beginning of the MQ 
     } 
     else 
     { 
      //We actually want to do nothing here. As in do not do Acknowledge 
      //This leaves the message on the MQ and we'll pick it up again later 
      //But we want to move on to the next message in the MQ 
     } 
    } 
    else 
    { 
     message.Acknowledge(); //this really pulls it off of the MQ...its useless to us anyways 
    } 
} 
else 
{ 
    Thread.Sleep(0); 
} 

ThreadPool.QueueUserWorkItem((o) => Receive(forOrigination, consumer)); 

那麼幾個問題:

  1. 如果我不承認它停留在MQ消息,對不對?

  2. 如果消息未被確認,那麼默認情況下,當我再次從MQ中讀取同一個偵聽器時,它將讀取下一個並且不會到達開始,對吧?

  3. 如何更改偵聽器,以便下次讀取我從隊列的開頭處開始?

回答

2

將消息留在隊列上是一種反模式。如果您不想或無法在您的邏輯的某個特定點處理消息,則您有多種選擇:

  • 將它從隊列中取出並放入另一個隊列/主題以獲得延遲/不同的處理。
  • 將它從隊列中取出並轉儲到數據庫,平面文件 - 無論如何,如果您想在消息流之外處理它,或者根本不想處理它。
  • 如果可行,您可能需要更改消息生產者,以便它不會在同一隊列/主題中混合具有不同處理要求的消息。

在任何情況下,不要在隊列中留下消息,並且總是前進到下一條消息。這將使應用程序的方式更具可預測性,更易於推理。您還將避免各種性能問題。如果您的應用程序對消息傳遞的順序敏感,那麼對所選消息的手動確認也會與此不一致。

您的問題:

  1. 的JMS規範是模糊的關於未確認的消息的行爲 - 他們可能會按順序發送,這是不確定什麼時候,當他們將交付。此外,確認方法調用將確認所有先前收到和未確認的消息 - 可能不是您想到的。

  2. 如果您留下了消息,聽衆可能會立即返回,也可能不會立即返回。如果重新啓動它,它當然會重新開始,但是當它坐在那裏等待消息時,它依賴於實現。

所以,如果你試圖讓你的設計工作,你可能會在某些情況下得到它的工作,但它不會是可預測的或可靠的。