我有一個流媒體服務器與合同看起來像這樣:如何檢測是否WCF流媒體客戶端斷開中流
[ServiceContract]
public interface IStreamingService
{
[OperationContract(Action = "StreamingMessageRequest", ReplyAction = "StreamingMessageReply")]
Message GetStreamingData(Message query);
}
這裏有東西去除,以簡化的東西基本的實現(如錯誤處理) :
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
[ErrorBehavior(typeof(StreamingServiceErrorHandler))]
public class StreamingService : IStreamingService
{
public StreamingService()
{
}
public Message GetStreamingData(Message query)
{
var dataQuery = query.GetBody<DataQuery>();
// Hook up events to let us know if the client disconnects so that we can stop the query...
EventHandler closeAction = (sender, ea) =>
{
dataQuery.Stop();
};
OperationContext.Current.Channel.Faulted += closeAction;
OperationContext.Current.Channel.Closed += closeAction;
Message streamingMessage = Message.CreateMessage(
MessageVersion.Soap12WSAddressing10,
"QueryMessageReply",
new StreamingBodyWriter(QueryMethod(dataQuery));
return streamingMessage;
}
public IEnumerable<object> QueryMethod (DataQuery query)
{
// Returns a potentially infinite stream of objects in response to the query
}
}
此實現使用自定義BodyWriter從QueryMethod流的結果:
public class StreamingBodyWriter : BodyWriter
{
public StreamingBodyWriter(IEnumerable items)
: base(false) // False should be passed here to avoid buffering the message
{
Items = items;
}
internal IEnumerable Items { get; private set; }
private void SerializeObject(XmlDictionaryWriter writer, object item)
{
// Serialize the object to the stream
}
protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
{
foreach (object item in Items)
{
SerializeObject(writer, item);
}
}
}
客戶端連接並開始讀取數據流。就像這樣:
public IEnumerable<T> GetStreamingData<T>(Message queryMessage)
{
Message reply = _Server.GetStreamingData(queryMessage);
// Get a chunckable reader for the streaming reply
XmlReader reader = reply.GetReaderAtBodyContents();
// Read the stream of objects, deserializing each one as appropriate
while (!reader.EOF)
{
object item = DeserializeObject(reader);
if (item == null)
continue;
// Yield each item as it's deserialized, resulting in a [potentially] never-ending stream of objects
yield return (T)item;
}
}
這很好,我得到一個對象流回客戶端。非常好。問題在於客戶端在中途斷開(無論是優雅還是不正常)。在任何一種情況下,服務器都不會收到有關此斷開連接的通知,除非是由服務的錯誤處理程序拾取的錯誤。
正如你所看到的,我已經嘗試在服務方法中掛鉤Faulted和Closed通道事件,但是當客戶端斷開連接時它們不會觸發。
我懷疑這些事件不會觸發,因爲流式合約服務已經從操作方法(GetStreamingData)返回。此方法已返回帶有自定義身體寫入器的消息,並且正文寫入(在服務通道堆棧中較低)正在從計算線程(通過IEnumerable)獲取結果並將它們傳輸到回覆消息中。由於操作方法已經返回,我猜這些通道事件不會引發火災。
該綁定是net.tcp綁定(非雙工)的自定義版本,只有兩個綁定元素:BinaryMessageEncodingBindingElement和TcpTransportBindingElement。沒有安全或任何東西。基本上只是原始的net.tcp與二進制消息。
問題是,當客戶端斷開連接時,我希望服務器停止正在生成結果的背景計算線程,並將它們饋送到由BodyWriter讀取的IEnumerable中。身體作家已經停止(顯然),但線程繼續存在。
那麼我可以在哪裏掛鉤以發現客戶端是否已經斷開中間流?
更新:
主要有兩種情況斷線:顯式斷開,這或者是由於處置在客戶端的客戶端代理或過程的終止;被動斷開,通常是由於網絡故障(例如:ISP斷開連接,或者網絡電纜被拔出)。
在第一種情況下,存在顯式連接異常,服務器在嘗試向流中發送新數據時接收到get。沒問題。
破損網管的第二種情況更麻煩。通常這種情況是基於發送超時來檢測的,但由於這是一個流式接口,所以發送超時的速度是上升的(由於可以想象流式數據傳輸可以持續這麼長時間,我已經將其啓動了幾天)。所以當一個客戶端由於一個脆弱的網絡管道而斷開連接時,服務會繼續發送數據到一個真正存在的連接。
在這一點上,我不知道解決第二個選擇的好方法。建議?
我也遇到了這個問題,以及幾乎完全相同的配置與你。我需要知道客戶端何時斷開連接,以便快速處理稀缺資源。你是否曾經能夠找到解決方案或解決方法?謝謝。 – Paccc 2013-01-19 20:17:53