2012-09-04 110 views
2

我有我用C#編寫的代理服務器。我也有一個Java applet,通過這個C#代理接收來自視頻服務器的MJPEG數據。我的問題是,當視頻服務器當前沒有更多的MJPEG數據可用時,代理卡住在阻塞讀取調用中,我無法終止它。多線程生產者/消費者

// write the forwarded output 
// blocking on remoteServerResponseStream.Read 
while (m_running && (read = remoteServerResponseStream.Read(buffer, 0, buffer.Length)) > 0) 
{ 
    bytesRead += read; 

    output.Write(buffer, 0, read); 

    output.Flush(); 
} 

這應該由Java小程序關閉流(可變output在上面的代碼)被終止。但是,Java小應用程序無法關閉此連接,因爲代理從未確認關閉請求,而它仍處於等待數據的remoteServerResponseStream.Read中。

我一直在這個問題上停留了一個星期。我想我可能會想到一個解決方案,但我不確定它是否會起作用。我很想聽到關於此的任何反饋。

我的想法是在另一個線程上使用removeServerResponseStream.Read並使用共享隊列來傳輸數據。線程將讀取數據並將其放置在隊列中。然後,我的主線程會將隊列中可用的任何數據轉發到output。這樣,我可以不斷檢查output.CanWrite是否成爲假,在這種情況下,我可以中止讀線程(這是我知道中斷阻塞流讀取的唯一方法)。這是一個可行的解決方案嗎?如果是這樣,我應該不斷地輪詢隊列以獲取可用的數據,還是應該創建一個事件?我很想聽到關於這個問題的任何想法!提前致謝。

+0

您應該使用.NET 4.0 Task Parallel Library中引入的併發集合和其他容器類。 – dthorpe

+0

我們需要額外的代碼。 –

回答

2

在這些情況下,我只是從另一個線程結束流讀取器。當你關閉閱讀器時,它會打破循環。進行額外的同步隊列只是更多的樣板,不必要的做。最終你必須停止封鎖閱讀器。將值放入隊列只會阻止您的消費者被阻止,而不會阻止您的讀者。

這裏是一個套接字阻塞讀取的例子。我將在閱讀1字節時阻止,我將故意永遠不會發送。幾秒鐘後,我將處理套接字,套接字將退出其阻塞式讀取,應用程序將優雅地退出。我將記錄每個線程正在做什麼以及什麼時候發生(每個日誌行的前綴數是線程ID,我將縮進單獨的線程)。

在使用阻塞式閱讀器的類中,您應該實現Disposable模式並關閉/處理您的閱讀器。

static void Main(string[] args) 
{ 
    SocketTest(); 

    Console.WriteLine("Press any key to exit"); 
    Console.ReadKey(); 
} 

public static void SocketTest() 
{ 
    int port = 22345; 

    var tcpListener = new TcpListener(IPAddress.Any, port); 

    tcpListener.Start(); 

    // Listening thread 
    new Thread(() => 
    { 

     Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - Waiting for connection to port"); 

     var socket = tcpListener.AcceptSocket(); 

     Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - Connection accepted"); 

     var stream = new NetworkStream(socket); 
     var reader = new BinaryReader(stream); 

     try 
     { 
      Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - Starting blocking read"); 
      var bytes = reader.ReadBytes(1); 
      Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " - Done blocking read, read {0} bytes", bytes.Length); 
     } 
     catch (Exception ex) 
     { 
      Console.WriteLine("Error reading " + ex); 
     } 
    }).Start(); 

    // connecting thread 
    new Thread(() => 
    { 
     var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 

     Console.WriteLine("\t" + Thread.CurrentThread.ManagedThreadId + " - Connecting to local port"); 

     socket.Connect("127.0.0.1", port); 

     Console.WriteLine("\t" + Thread.CurrentThread.ManagedThreadId + " - Connecting to local succeeded"); 

     Thread.Sleep(TimeSpan.FromSeconds(2)); 

     Console.WriteLine("\t" + Thread.CurrentThread.ManagedThreadId + " - Disposing of socket"); 

     socket.Dispose(); 

    }).Start(); 

    Thread.Sleep(TimeSpan.FromSeconds(5)); 
} 

當你運行這個你:

3 - Waiting for connection to port 
     4 - Connecting to local port 
     4 - Connecting to local succeeded 
3 - Connection accepted 
3 - Starting blocking read 
     4 - Disposing of socket 
3 - Done blocking read, read 0 bytes 
Press any key to exit 

你有種回答自己的問題在這裏,但:

我的想法是讓removeServerResponseStream.Read在另一個線程

這正是你應該做的。當你知道你的應用已經關閉了活動線程的套接字。這個免費的封鎖線程,你可以優雅地結束。

這裏的模式是你通常爲特定套接字啓動一個線程,並且維護一個活動線程,這是一種用於請求的「控制器」線程。

+0

我的問題是我不知道什麼時候處理插座。當視頻剪輯結束時,視頻服務器停止發送數據,但保持連接處於活動狀態,因爲視頻服務器可以接收命令以查找剪輯中的不同位置。例如,如果它收到了一個命令來查找剪輯,它就會開始再次通過同一個連接發送數據。所以不通過連接發送的數據並不一定意味着我們應該關閉它。當Java applet關閉'output'時,我需要關閉連接。 – mittmemo

+1

聽起來像你知道什麼時候處理socket:當applet關閉時。如果您沒有確定性的方式來知道您的小程序何時斷開連接,則可以在套接字上分配超時值。如果在已知的時間內沒有獲取任何數據,套接字將自動關閉,但如果該值不正確,則會受到必須重新連接的支配。你可以讓你的小程序有一個ping?如果在一定的時間內沒有收到ping命令,控制器線程可以關閉相應的打開的套接字。 – devshorts

+1

@ kdg123,我根據你的評論更新了我的回覆 – devshorts