2015-02-07 64 views
2
var incomingStream = ... 
var outgoingStream = ... 

await incomingStream.CopyToAsync(outgoingStream); 

上面的代碼非常簡單,並將傳入流複製到outgoign流。這兩股股票都是大量的轉移匯率。現在如何創建可以轉換流的流封裝流

,可以說,我想要的東西,如Func<Stream,Stream,Task>改造流我會怎麼做,如果沒有閱讀的所有數據。

Ofcause我只是做

var ms = new MemoryStream(); 
incomingStream.CopyTo(ms); 

--- do transform of streams and seek 
ms.CopyTo(outgoingStream) 

但會讀這是否有任何內建的東西,允許我從傳入流中讀取並寫入一個不會緩衝所有內容的新流,而只是爲緩衝數據保留一小段內部流,並且它不會從傳入流中讀取數據被再次取消。

我所試圖做的是:

protected async Task XmlToJsonStream(Stream instream, Stream outStream) 
    { 
     XmlReaderSettings readerSettings = new XmlReaderSettings(); 
     readerSettings.IgnoreWhitespace = false; 
     var reader = XmlReader.Create(instream, readerSettings); 
     var jsonWriter = new JsonTextWriter(new StreamWriter(outStream)); 
     jsonWriter.WriteStartObject(); 

     while (await reader.ReadAsync()) 
     { 
      jsonWriter.writeReader(reader); 
     } 
     jsonWriter.WriteEndObject(); 
     jsonWriter.Flush(); 
    } 
    protected async Task XmlFilterStream(Stream instream, Stream outStream) 
    { 
     XmlReaderSettings readerSettings = new XmlReaderSettings(); 
     readerSettings.IgnoreWhitespace = false; 
     var reader = XmlReader.Create(instream, readerSettings); 
     var writer = XmlWriter.Create(outStream, new XmlWriterSettings { Async = true, CloseOutput = false }) 

     while (reader.Read()) 
     { 
      writer.writeReader(reader); 
     } 


    } 

,但我不知道如何把它掛。

var incomingStream = ... 
var outgoingStream = ... 
var temp=... 
XmlFilterStream(incomingStream,temp); 
XmlToJsonStream(temp,outgoingstream); 

因爲如果我使用MemoryStream作爲臨時存儲器,它是不是隻是在最後纔將它全部存儲在流中。尋找在讀取數據時丟棄數據的流。

以上所有僅僅是示例代碼,缺少一些配置和尋找原因,但我希望我能夠設法說明我要做什麼。爲了能夠基於設置在即時複製流之間即插即用,進行xml過濾並可選地將其轉換爲json。

回答

2

流是字節的序列,因此流轉換將類似於Func<ArraySegment<byte>, ArraySegment<byte>>。然後,您可以將其應用於流式傳輸:

async Task TransformAsync(this Stream source, Func<ArraySegment<byte>, ArraySegment<byte>> transform, Stream destination, int bufferSize = 1024) 
{ 
    var buffer = new byte[bufferSize]; 
    while (true) 
    { 
    var bytesRead = await source.ReadAsync(buffer, 0, bufferSize); 
    if (bytesRead == 0) 
     return; 
    var bytesToWrite = transform(new ArraySegment(buffer, 0, bytesRead)); 
    if (bytesToWrite.Count != 0) 
     await destination.WriteAsync(bytesToWrite.Buffer, bytesToWrite.Offset, bytesToWrite.Count); 
    } 
} 

這比這更復雜一些,但這是一般想法。它需要一些邏輯來確保WriteAsync寫入所有字節;除了transform方法外,通常還需要一個「刷新」方法,該方法在源流結束時調用,因此轉換算法有最後機會返回其最終數據以寫入輸出流。

如果你想要其他的東西,如XML或JSON類型,那麼你可能會更好地與Reactive Extensions

+0

我看到這種方式更乾淨,但使用流在我的例子中可以使用XmlReader/Writer和JsonReader/Writers進行轉換。我會更多地考慮設計。 – 2015-02-07 15:21:23

+0

@PoulK.Sørensen你有沒有想過什麼?我處於類似的情況。試圖使用SqlClient Streaming,它只需要'流'作爲參數,但我想壓縮原始數據流進入數據庫。所以基本上我想用'GZipStream'封裝原始流(即'FileStream'),這樣每次SqlClient Streaming可能啓動一個'ReadAsync',我的包裝流將首先從基礎流讀取數據,壓縮數據然後返回壓縮字節到SqlClient流媒體。 – Terry 2016-10-31 15:56:46

0

我不知道我完全理解你的問題,但我想你是問你如何在輸入流上操作,而不是先將它完全加載到內存中。

在這種情況下,你不想做做這樣的事情:

var ms = new MemoryStream(); 
incomingStream.CopyTo(ms); 

確實負載的整個輸入流incomingStream到內存中 - 爲ms

從我所看到的,你的XmlFilterStream方法似乎是多餘的,即XmlToJsonStreamXmlFilterStream無論如何做的一切。

爲什麼不只是有:

protected async Task XmlToJsonStream(Stream instream, Stream outStream) 
{ 
    XmlReaderSettings readerSettings = new XmlReaderSettings(); 
    readerSettings.IgnoreWhitespace = false; 
    var reader = XmlReader.Create(instream, readerSettings); 
    var jsonWriter = new JsonTextWriter(new StreamWriter(outStream)); 
    jsonWriter.WriteStartObject(); 

    while (await reader.ReadAsync()) 
    { 
     jsonWriter.writeReader(reader); 
    } 
    jsonWriter.WriteEndObject(); 
    jsonWriter.Flush(); 
} 

,並調用它是這樣的:

var incomingStream = ... 
var outgoingStream = ... 
XmlToJsonStream(incomingStream ,outgoingstream); 

如果答案是你省略XmlFilterStream一些重要的細節,然後,沒有看到這些細節,我會建議您只將這些功能集成到一個XmlToJsonStream功能中。

+0

對不起,遺漏了很多。 XmlFilterStream不僅僅是讀/寫,它會讀取xml,(if(Include(reader){writer(reader))。但是我明白了你的觀點 – 2015-02-07 15:17:24