2017-01-10 68 views
2

我已經實現了從Oracle AQ中消除郵件並將其作爲IObservable暴露給系統。工作流程如下: -Oracle AQ使用反應擴展和事務處理回調ODP.NET

  1. 應用程序收到Oracle有關新消息的回調事件。
  2. 應用程序將消息出隊並將其添加到IObservable(消息出隊作爲消息出隊後立即提交的事務的一部分)。

我意識到一個潛在的問題,那就是當消息出隊時,事務立即提交,而不是等待它被應用程序成功使用。下面是我正在使用的代碼,但需要一個建議,其中/如何在應用程序成功使用後提交事務。目前它啓動並提交/回滾專用Dequeue方法中的事務。

public sealed class Queue<T> : IQueue<T> where T : IQueueDataType 
{ 
    private readonly OracleConnection _connection; 

    private readonly string _consumerName; 

    private readonly IQueueSetting _queueSetting; 

    private readonly IDbConnectionFactory _dbConnectionFactory; 

    private OracleAQQueue _queue; 

    private IObservable<T> _messages; 

    private bool _isDisposed; 

    public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting) 
    { 
     _dbConnectionFactory = dbConnectionFactory; 
     _connection = dbConnectionFactory.Create() as OracleConnection; 
     _consumerName = dalSettings.Consumer; 
     _queueSetting = queueSetting; 

    } 

    public void Connect() 
    {    
     _connection.Open(); 
     _queue = new OracleAQQueue(_queueSetting.QueueName, _connection) 
     { 
      DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove}, 
      UdtTypeName = _queueSetting.QueueDataTypeName, 
      MessageType = OracleAQMessageType.Udt 
     }; 

     _queue.NotificationConsumers = new[] { _consumerName }; 

     _messages = Observable 
      .FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
       h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h) 
      .Where(x => x.EventArgs.AvailableMessages > 0) 
      .Select(x => 
      { 
       try 
       { 
       Log.Info("Msg received", "Queue", _queueSetting.QueueName); 

        OracleAQMessage msg = Dequeue(); 

        Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName); 
        return (T)msg.Payload; 
       } 
       catch (Exception e) 
       { 

       } 
      }).Publish().RefCount(); 

    } 

    private OracleAQMessage Dequeue() 
    { 
     using (var connection = _dbConnectionFactory.Create() as OracleConnection) 
     { 
      try 
      { 
       connection.Open(); 
       using (OracleTransaction transaction = connection.BeginTransaction()) 
       { 
        try 
        { 
         OracleAQMessage msg = _queue.Dequeue(); 
         **transaction.Commit();** 
         return msg; 
        } 
        catch (Exception e) 
        {       
         **transaction.Rollback();** 
         throw; 
        } 
       } 
      } 
      catch (Exception e) 
      { 
       Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e), 
        "Dequeue", GetType().FullName); 
       throw; 
      } 
      finally 
      { 
       connection.Close(); 
      } 
     } 
    } 

    public IObservable<T> GetMessages() 
    { 
     return _messages; 
    } 

    public void Dispose() 
    {   
     if (!_isDisposed) 
     { 
      if (_queue != null) 
      { 
       _queue.Dispose();  
      } 

      _connection.Dispose(); 
      _isDisposed = true; 
     } 
    } 
} 

如果不是使用IObservable,我只是公開的事件提交和回滾事務時會很容易,但我想我可以用IObservable即做,我可以運行一個Linq,但不知道如何提交交易。

回答

1

我不認爲這裏有一個簡單的解決方案。如果我的理解是正確的:

  1. 事件是由甲骨文推,
  2. 你想通過IObservable流暴露的情況下,
  3. 應用「處理」了,
  4. 要提交如果處理成功,否則回滾。

問題是IObservable是一種單向機制。一旦你發佈了一條消息(在我們的例子中你從這個Oracle隊列中得到了一些東西),其目的不是要跟蹤它,而是稍後決定是否提交/回滾。所以,你的選擇是相當多的東西你的應用程序邏輯的某種形式的處理程序:

Func<OracleMessage, bool> isMessageCommitable; //...application handling logic here 

var appHandledMessages = oracleSourceMessages 
    .Select(m => Tuple.Create(m, isMessageCommitable(m))) 
    .Publish() 
    .RefCount(); 

appHandledMessages 
    .Where(t => t.Item2) 
    .Subscribe(t => Commit(t.Item1)); 

appHandledMessages 
    .Where(t => !t.Item2) 
    .Subscribe(t => Rollback(t.Item1)); 

...或設立IObservable指點下這會從應用程序推回到隊列中的其他方式,也應該提交/回滾消息。您可能需要兩個,一個用於提交,一個用於回滾,並且可能需要傳入構造函數Queue<T>

祝你好運。

+0

謝謝。你能否提供第二種情況的例子「IObservables指向另一種方式...........」 – tangokhi