2011-02-11 88 views
8

我有一個流媒體服務器與合同看起來像這樣:如何檢測是否WCF流媒體客戶端斷開中流

[ServiceContract] 
public interface IStreamingService 
{ 
    [OperationContract(Action = "StreamingMessageRequest", ReplyAction = "StreamingMessageReply")] 
    Message GetStreamingData(Message query); 
} 

這裏有東西去除,以簡化的東西基本的實現(如錯誤處理) :

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] 
[ErrorBehavior(typeof(StreamingServiceErrorHandler))] 
public class StreamingService : IStreamingService 
{ 
    public StreamingService() 
    { 
    } 

    public Message GetStreamingData(Message query) 
    { 
     var dataQuery = query.GetBody<DataQuery>(); 

     // Hook up events to let us know if the client disconnects so that we can stop the query... 
     EventHandler closeAction = (sender, ea) => 
     { 
      dataQuery.Stop(); 
     }; 
     OperationContext.Current.Channel.Faulted += closeAction; 
     OperationContext.Current.Channel.Closed += closeAction; 

     Message streamingMessage = Message.CreateMessage(
      MessageVersion.Soap12WSAddressing10, 
      "QueryMessageReply", 
      new StreamingBodyWriter(QueryMethod(dataQuery)); 
     return streamingMessage; 
    } 

    public IEnumerable<object> QueryMethod (DataQuery query) 
    { 
     // Returns a potentially infinite stream of objects in response to the query 
    } 
} 

此實現使用自定義BodyWriter從QueryMethod流的結果:

public class StreamingBodyWriter : BodyWriter 
{ 
    public StreamingBodyWriter(IEnumerable items) 
     : base(false) // False should be passed here to avoid buffering the message 
    { 
     Items = items; 
    } 

    internal IEnumerable Items { get; private set; } 

    private void SerializeObject(XmlDictionaryWriter writer, object item) 
    { 
     // Serialize the object to the stream 
    } 

    protected override void OnWriteBodyContents(XmlDictionaryWriter writer) 
    { 
     foreach (object item in Items) 
     { 
      SerializeObject(writer, item); 
     } 
    } 
} 

客戶端連接並開始讀取數據流。就像這樣:

public IEnumerable<T> GetStreamingData<T>(Message queryMessage) 
{ 
    Message reply = _Server.GetStreamingData(queryMessage); 

    // Get a chunckable reader for the streaming reply 
    XmlReader reader = reply.GetReaderAtBodyContents(); 

    // Read the stream of objects, deserializing each one as appropriate 
    while (!reader.EOF) 
    { 
     object item = DeserializeObject(reader); 
     if (item == null) 
      continue; 

     // Yield each item as it's deserialized, resulting in a [potentially] never-ending stream of objects 
     yield return (T)item; 
    } 
} 

這很好,我得到一個對象流回客戶端。非常好。問題在於客戶端在中途斷開(無論是優雅還是不正常)。在任何一種情況下,服務器都不會收到有關此斷開連接的通知,除非是由服務的錯誤處理程序拾取的錯誤。

正如你所看到的,我已經嘗試在服務方法中掛鉤Faulted和Closed通道事件,但是當客戶端斷開連接時它們不會觸發。

我懷疑這些事件不會觸發,因爲流式合約服務已經從操作方法(GetStreamingData)返回。此方法已返回帶有自定義身體寫入器的消息,並且正文寫入(在服務通道堆棧中較低)正在從計算線程(通過IEnumerable)獲取結果並將它們傳輸到回覆消息中。由於操作方法已經返回,我猜這些通道事件不會引發火災。

該綁定是net.tcp綁定(非雙工)的自定義版本,只有兩個綁定元素:BinaryMessageEncodingBindingElement和TcpTransportBindingElement。沒有安全或任何東西。基本上只是原始的net.tcp與二進制消息。

問題是,當客戶端斷開連接時,我希望服務器停止正在生成結果的背景計算線程,並將它們饋送到由BodyWriter讀取的IEnumerable中。身體作家已經停止(顯然),但線程繼續存在。

那麼我可以在哪裏掛鉤以發現客戶端是否已經斷開中間流?

更新:

主要有兩種情況斷線:顯式斷開,這或者是由於處置在客戶端的客戶端代理或過程的終止;被動斷開,通常是由於網絡故障(例如:ISP斷開連接,或者網絡電纜被拔出)。

在第一種情況下,存在顯式連接異常,服務器在嘗試向流中發送新數據時接收到get。沒問題。

破損網管的第二種情況更麻煩。通常這種情況是基於發送超時來檢測的,但由於這是一個流式接口,所以發送超時的速度是上升的(由於可以想象流式數據傳輸可以持續這麼長時間,我已經將其啓動了幾天)。所以當一個客戶端由於一個脆弱的網絡管道而斷開連接時,服務會繼續發送數據到一個真正存在的連接。

在這一點上,我不知道解決第二個選擇的好方法。建議?

+0

我也遇到了這個問題,以及幾乎完全相同的配置與你。我需要知道客戶端何時斷開連接,以便快速處理稀缺資源。你是否曾經能夠找到解決方案或解決方法?謝謝。 – Paccc 2013-01-19 20:17:53

回答

1

Faulted事件對於處理這些類型的場景來說確實是最好的事情,但只有在啓用了reliableSession時纔會觸發它。它在標準netTcpBinding中默認啓用,但在這種情況下不會啓用,因爲您使用的是綁定的自定義分拆版本。嘗試添加到您的自定義綁定:)

+0

我會試試這個。我似乎記得,可靠的會話不適用於流式綁定,但我會再次檢查。 – 2011-06-21 19:26:08

相關問題