異步處理隊列項目是否有很好的實現?異步處理項目的實現
2
A
回答
2
很老,但是這是很好的一個,我知道斷http://www.codeproject.com/KB/cs/inprocessasynservicesincs.aspx
5
如果您使用的是.NET 4,那麼很多這是免費的。
如果您已經擁有所有項目,可以使用Parallel.ForEach
。如果您需要生產者/消費者隊列,則可以使用BlockingCollection<T>
來包裝其中一個併發集合(例如ConcurrentQueue<T>
或ConcurrentStack<T>
)。你如何使用它取決於你;有一個blog post here進入一個詳細的例子,並且可能還有其他類似的帖子。 (您可能需要查看Parallel Team Blog獲取更多資料。)
2
使用.NET 4任務。
var t = Task<int>.Factory.StartNew(() => ProcessItem());
使用ConcurrencyOptions設置該處理的最大並行度。
如果您想自己推出,請使用BlockingCollection<T>
,它爲線程安全集合提供了阻塞和邊界功能,併爲消費者實現了一個單獨的線程(或多個線程)。
3
如果你不幸你可以看看生產者/消費者模式不被使用.NET 4
這裏是我的代碼,我拆開,我對混亂道歉,但你應該能夠通過添加到項目並重新編譯來使用它,然後使用生成的dll創建您的進程。
枚舉爲ChannelState:
public enum ChannelState
{
WaitingForSend,
WaitingForReceive,
Open
}
接口:
public interface IChannel<TMessage>
{
// Methods
TMessage Receive();
void Send(TMessage message);
// Properties
bool CanReceive { get; }
bool CanSend { get; }
ChannelState State { get; }
}
using System;
public interface IReceiver<TMessage>
{
// Events
event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived;
// Methods
void Activate();
void Deactivate();
// Properties
bool IsActive { get; }
}
具體類:
using System.Collections.Generic;
using System.Threading;
using System;
public class BufferedChannel<TMessage> : IChannel<TMessage>
{
// Fields
private int _blockedReceivers;
private int _blockedSenders;
private Queue<TMessage> _buffer;
private int _capacity;
private EventWaitHandle _capacityAvailableEvent;
private EventWaitHandle _messagesAvailableEvent;
// Methods
public BufferedChannel()
{
this._buffer = new Queue<TMessage>();
this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset);
this._capacity = 50;
}
public BufferedChannel(int bufferSize)
{
this._buffer = new Queue<TMessage>();
this._messagesAvailableEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._capacityAvailableEvent = new EventWaitHandle(true, EventResetMode.AutoReset);
this._capacity = 50;
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize", bufferSize, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero);
}
this._capacity = bufferSize;
}
public TMessage Receive()
{
Interlocked.Increment(ref this._blockedReceivers);
try
{
this._messagesAvailableEvent.WaitOne();
}
catch
{
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedReceivers);
}
throw;
}
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedReceivers);
this._capacityAvailableEvent.Set();
if ((this._buffer.Count - 1) > this._blockedReceivers)
{
this._messagesAvailableEvent.Set();
}
return this._buffer.Dequeue();
}
}
public void Send(TMessage message)
{
Interlocked.Increment(ref this._blockedSenders);
try
{
this._capacityAvailableEvent.WaitOne();
}
catch
{
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedSenders);
}
throw;
}
lock (this._buffer)
{
Interlocked.Decrement(ref this._blockedSenders);
this._buffer.Enqueue(message);
if (this._buffer.Count < this.BufferSize)
{
this._capacityAvailableEvent.Set();
}
this._messagesAvailableEvent.Set();
}
}
// Properties
public int BufferCount
{
get
{
lock (this._buffer)
{
return this._buffer.Count;
}
}
}
public int BufferSize
{
get
{
lock (this._buffer)
{
return this._capacity;
}
}
set
{
lock (this._buffer)
{
if (value <= 0)
{
throw new ArgumentOutOfRangeException("BufferSize", value, ExceptionMessages.ChannelsBufferSizeMustBeGreaterThanZero);
}
this._capacity = value;
if ((this._blockedSenders > 0) && (this._capacity > this._buffer.Count))
{
this._capacityAvailableEvent.Set();
}
}
}
}
public bool CanReceive
{
get
{
return true;
}
}
public bool CanSend
{
get
{
return true;
}
}
public ChannelState State
{
get
{
if (this._blockedSenders > 0)
{
return ChannelState.WaitingForReceive;
}
if (this._blockedReceivers > 0)
{
return ChannelState.WaitingForSend;
}
return ChannelState.Open;
}
}
}
using System;
using System.Collections.Generic;
using System.Threading;
using System.ComponentModel;
using System.Runtime.CompilerServices;
public sealed class Receiver<TMessage> : Component, IReceiver<TMessage>
{
// Fields
private volatile bool _continue;
private object _controlLock;
private volatile bool _disposed;
private Thread _receiverThread;
private bool _receiving;
private object _receivingLock;
private object _threadLock;
[CompilerGenerated]
private IChannel<TMessage> channel;
// Events
public event EventHandler<MessageReceivedEventArgs<TMessage>> MessageReceived;
// Methods
public Receiver(IChannel<TMessage> channel)
{
this._controlLock = new object();
this._threadLock = new object();
this._receivingLock = new object();
if (channel == null)
{
throw new ArgumentNullException("channel");
}
this.Channel = channel;
}
public void Activate()
{
this.CheckDisposed();
lock (this._controlLock)
{
if (this._receiverThread != null)
{
throw new InvalidOperationException();
}
this._continue = true;
this._receiverThread = new Thread(new ThreadStart(this.RunAsync));
this._receiverThread.IsBackground = true;
this._receiverThread.Start();
}
}
private void CheckDisposed()
{
if (this._disposed)
{
throw new ObjectDisposedException(base.GetType().Name);
}
}
public void Deactivate()
{
lock (this._controlLock)
{
if (this._continue)
{
this._continue = false;
lock (this._threadLock)
{
if (this._receiverThread != null)
{
this.SafeInterrupt();
this._receiverThread.Join();
this._receiverThread = null;
}
}
}
}
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
this.Deactivate();
this._disposed = true;
}
}
private void OnMessageReceived(TMessage message)
{
EventHandler<MessageReceivedEventArgs<TMessage>> messageReceived = this.MessageReceived;
if (messageReceived != null)
{
messageReceived(this, new MessageReceivedEventArgs<TMessage>(message));
}
}
private void RunAsync()
{
while (this._continue)
{
TMessage message = default(TMessage);
bool flag = false;
try
{
lock (this._receivingLock)
{
this._receiving = true;
}
message = this.Channel.Receive();
flag = true;
lock (this._receivingLock)
{
this._receiving = false;
}
Thread.Sleep(0);
}
catch (ThreadInterruptedException)
{
}
if (!this._continue)
{
if (flag)
{
this.Channel.Send(message);
return;
}
break;
}
this.OnMessageReceived(message);
}
}
private void SafeInterrupt()
{
lock (this._receivingLock)
{
lock (this._threadLock)
{
if (this._receiving && (this._receiverThread != null))
{
this._receiverThread.Interrupt();
}
}
}
}
// Properties
protected override bool CanRaiseEvents
{
get
{
return true;
}
}
public IChannel<TMessage> Channel
{
[CompilerGenerated]
get
{
return this.channel;
}
[CompilerGenerated]
private set
{
this.channel = value;
}
}
public bool IsActive
{
get
{
lock (this._controlLock)
{
return (this._receiverThread != null);
}
}
}
}
using System;
using System.Runtime.CompilerServices;
public class MessageReceivedEventArgs<TMessage> : EventArgs
{
// Fields
[CompilerGenerated]
private TMessage message;
// Methods
public MessageReceivedEventArgs(TMessage message)
{
this.Message = message;
}
// Properties
public TMessage Message
{
[CompilerGenerated]
get
{
return this.message;
}
[CompilerGenerated]
private set
{
this.message = value;
}
}
}
using System.Threading;
public class BlockingChannel<TMessage> : IChannel<TMessage>
{
// Fields
private TMessage _message;
private EventWaitHandle _messageReceiveEvent;
private EventWaitHandle _messageReceiveyEvent;
private object _sendLock;
private ChannelState _state;
private object _stateLock;
// Methods
public BlockingChannel()
{
this._state = ChannelState.Open;
this._stateLock = new object();
this._messageReceiveyEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._messageReceiveEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
this._sendLock = new object();
}
public TMessage Receive()
{
this.State = ChannelState.WaitingForSend;
this._messageReceiveyEvent.WaitOne();
this._messageReceiveEvent.Set();
this.State = ChannelState.Open;
return this._message;
}
public void Send(TMessage message)
{
lock (this._sendLock)
{
this._message = message;
this.State = ChannelState.WaitingForReceive;
this._messageReceiveyEvent.Set();
this._messageReceiveEvent.WaitOne();
}
}
// Properties
public bool CanReceive
{
get
{
return true;
}
}
public bool CanSend
{
get
{
return true;
}
}
public ChannelState State
{
get
{
lock (this._stateLock)
{
return this._state;
}
}
private set
{
lock (this._stateLock)
{
this._state = value;
}
}
}
}
相關問題
- 1. 在ASP.NET C#項目中實現異常處理
- 2. 處理列表的C#異步選項
- 3. 什麼庫實現消息的異步處理?
- 4. 如何實現與J2EE應用程序的異步處理
- 5. 如何實現很好地處理異步輸出的python REPL?
- 6. 在F#中實現異步http處理程序的模式?
- 7. 異步服務多次處理實體
- 8. 如何實現步驟,通過使用RxJS一步異步處理?
- 9. Node.js處理異步
- 10. 異步處理ODP.NET
- 11. Java異步處理
- 12. 如何在apache felix上實現異步處理?
- 13. C++中的異步處理
- 14. JSP中的異步處理
- 15. IValueConverter的異步實現
- 16. 如何實現頂級異常處理?
- 17. IErrorHandler實現不處理異常
- 18. 在ASP.NET 3.5中實現異常處理
- 19. 內部實現異常處理
- 20. 在JavaScript中實現異常處理
- 21. Node.js的最佳實踐異常處理 - 在異步/等待
- 22. 如何在C#測試項目中實現全局異常處理程序
- 23. 如何處理xamarin表單項目中的異常處理?
- 24. 處理JSON響應異步
- 25. BasicAck異步處理郵件
- 26. Rails異步處理模型
- 27. 處理異步響應
- 28. Tomcat 7異步處理
- 29. 異步處理.NET SQL Server?
- 30. 異步處理圖像(Django)
+1獲取到4.0 :),我只是寫這個也。 – TalentTuner 2010-10-14 06:25:29
'Parallel.ForEach'阻塞調用線程,直到它完成所有的線程,所以這不會真的幫助'異步處理隊列項目'現在呢?還是問題真的意味着「平行」而不是「異步」? – 2010-10-14 06:34:28
這很好,但不幸的是,我使用.Net 3.5 – Sumee 2010-10-14 07:36:19