我已經實現了從Oracle AQ中消除郵件並將其作爲IObservable暴露給系統。工作流程如下: -Oracle AQ使用反應擴展和事務處理回調ODP.NET
- 應用程序收到Oracle有關新消息的回調事件。
- 應用程序將消息出隊並將其添加到IObservable(消息出隊作爲消息出隊後立即提交的事務的一部分)。
我意識到一個潛在的問題,那就是當消息出隊時,事務立即提交,而不是等待它被應用程序成功使用。下面是我正在使用的代碼,但需要一個建議,其中/如何在應用程序成功使用後提交事務。目前它啓動並提交/回滾專用Dequeue方法中的事務。
public sealed class Queue<T> : IQueue<T> where T : IQueueDataType
{
private readonly OracleConnection _connection;
private readonly string _consumerName;
private readonly IQueueSetting _queueSetting;
private readonly IDbConnectionFactory _dbConnectionFactory;
private OracleAQQueue _queue;
private IObservable<T> _messages;
private bool _isDisposed;
public Queue(IDbConnectionFactory dbConnectionFactory, IDalSettings dalSettings, IQueueSetting queueSetting)
{
_dbConnectionFactory = dbConnectionFactory;
_connection = dbConnectionFactory.Create() as OracleConnection;
_consumerName = dalSettings.Consumer;
_queueSetting = queueSetting;
}
public void Connect()
{
_connection.Open();
_queue = new OracleAQQueue(_queueSetting.QueueName, _connection)
{
DequeueOptions = { Wait = 10, Visibility = OracleAQVisibilityMode.Immediate , ConsumerName = _consumerName, NavigationMode = OracleAQNavigationMode.FirstMessage, DequeueMode = OracleAQDequeueMode.Remove},
UdtTypeName = _queueSetting.QueueDataTypeName,
MessageType = OracleAQMessageType.Udt
};
_queue.NotificationConsumers = new[] { _consumerName };
_messages = Observable
.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs>(
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
try
{
Log.Info("Msg received", "Queue", _queueSetting.QueueName);
OracleAQMessage msg = Dequeue();
Log.Info("Msg received id " + msg.MessageId, "Queue", _queueSetting.QueueName);
return (T)msg.Payload;
}
catch (Exception e)
{
}
}).Publish().RefCount();
}
private OracleAQMessage Dequeue()
{
using (var connection = _dbConnectionFactory.Create() as OracleConnection)
{
try
{
connection.Open();
using (OracleTransaction transaction = connection.BeginTransaction())
{
try
{
OracleAQMessage msg = _queue.Dequeue();
**transaction.Commit();**
return msg;
}
catch (Exception e)
{
**transaction.Rollback();**
throw;
}
}
}
catch (Exception e)
{
Log.Error(string.Format("Error occurred while connecting to database to dequeue new message. Error : {0}", e),
"Dequeue", GetType().FullName);
throw;
}
finally
{
connection.Close();
}
}
}
public IObservable<T> GetMessages()
{
return _messages;
}
public void Dispose()
{
if (!_isDisposed)
{
if (_queue != null)
{
_queue.Dispose();
}
_connection.Dispose();
_isDisposed = true;
}
}
}
如果不是使用IObservable
,我只是公開的事件提交和回滾事務時會很容易,但我想我可以用IObservable
即做,我可以運行一個Linq
,但不知道如何提交交易。
謝謝。你能否提供第二種情況的例子「IObservables指向另一種方式...........」 – tangokhi