2012-09-13 55 views
12

我想要做的是創建一種「管道」(就像進程之間的管道),但在同一個程序中的C++ iostream之間。我有一個函數需要輸入流作爲參數,但我的數據來自輸出流。那麼是否有一種標準的方法將std::ostream的輸出傳遞給std::istream的輸入?C++連接輸出流到輸入流

+2

std :: stringstream是否適合您的需要?如果不是,請解釋原因。 – AProgrammer

+1

有一個iostream(注意它有一個'i'和一個'o'在開頭)。你在一端收集數據,並在那裏結束。那是你要的嗎。 –

+0

-1未指定的問題。 –

回答

13

您可以創建一個std::streambuf,其中輸出轉到一個緩衝區,當緩衝區變滿時,可以創建std::overflow()區塊。另一方面,當緩衝器變空時,你會有一個輸入緩衝器,它在underflow()上阻塞。顯然,閱讀和寫作將有兩個不同的線索。

棘手的事情是如何同步兩個緩衝區:在訪問緩衝區時,流不使用任何同步操作。只有當任何虛函數被調用時,您纔可以攔截操作並處理同步。另一方面,不使用緩衝區效率相當低。我要解決這個問題的方法是使用一個相對較小的輸出緩衝區(例如256 char s),並覆蓋sync()以使用此功能將字符傳輸到輸入緩衝區。 streambuf將使用互斥量進行同步,並使用條件變量阻止輸出上的完整輸入緩衝區和輸入上的空輸入緩衝區。爲了支持乾淨關閉,還應該有一個函數設置一個標誌,表示不再有輸入,所有進一步的輸出操作都會失敗。

創建實際的實現表明,兩個緩衝區是不夠的:訪問輸入和輸出緩衝區的線程可能在其他緩衝區阻塞時處於活動狀態。因此,需要第三個中間緩衝器。通過對上述計劃的小改動,下面是一些代碼(它使用微小的緩衝區來確保實際上溢出和下溢;對於真正的使用,至少輸入緩衝區應該可能更大)。

// threadbuf.cpp              -*-C++-*- 
// ---------------------------------------------------------------------------- 
// Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de   
//                  
// Permission is hereby granted, free of charge, to any person   
// obtaining a copy of this software and associated documentation  
// files (the "Software"), to deal in the Software without restriction, 
// including without limitation the rights to use, copy, modify,   
// merge, publish, distribute, sublicense, and/or sell copies of   
// the Software, and to permit persons to whom the Software is   
// furnished to do so, subject to the following conditions:    
//                  
// The above copyright notice and this permission notice shall be  
// included in all copies or substantial portions of the Software.  
//                  
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,  
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES  
// OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND    
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT   
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,   
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR   
// OTHER DEALINGS IN THE SOFTWARE. 
// ---------------------------------------------------------------------------- 


#include <algorithm> 
#include <condition_variable> 
#include <iostream> 
#include <mutex> 
#include <stdexcept> 
#include <streambuf> 
#include <string> 
#include <thread> 

// ---------------------------------------------------------------------------- 

class threadbuf 
    : public std::streambuf 
{ 
private: 
    typedef std::streambuf::traits_type traits_type; 
    typedef std::string::size_type  string_size_t; 

    std::mutex    d_mutex; 
    std::condition_variable d_condition; 
    std::string    d_out; 
    std::string    d_in; 
    std::string    d_tmp; 
    char*     d_current; 
    bool     d_closed; 

public: 
    threadbuf(string_size_t out_size = 16, string_size_t in_size = 64) 
     : d_out(std::max(string_size_t(1), out_size), ' ') 
     , d_in(std::max(string_size_t(1), in_size), ' ') 
     , d_tmp(std::max(string_size_t(1), in_size), ' ') 
     , d_current(&this->d_tmp[0]) 
     , d_closed(false) 
    { 
     this->setp(&this->d_out[0], &this->d_out[0] + this->d_out.size() - 1); 
     this->setg(&this->d_in[0], &this->d_in[0], &this->d_in[0]); 
    } 
    void close() 
    { 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      this->d_closed = true; 
      while (this->pbase() != this->pptr()) { 
       this->internal_sync(lock); 
      } 
     } 
     this->d_condition.notify_all(); 
    } 

private: 
    int_type underflow() 
    { 
     if (this->gptr() == this->egptr()) 
     { 
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      while (&this->d_tmp[0] == this->d_current && !this->d_closed) { 
       this->d_condition.wait(lock); 
      } 
      if (&this->d_tmp[0] != this->d_current) { 
       std::streamsize size(this->d_current - &this->d_tmp[0]); 
       traits_type::copy(this->eback(), &this->d_tmp[0], 
            this->d_current - &this->d_tmp[0]); 
       this->setg(this->eback(), this->eback(), this->eback() + size); 
       this->d_current = &this->d_tmp[0]; 
       this->d_condition.notify_one(); 
      } 
     } 
     return this->gptr() == this->egptr() 
      ? traits_type::eof() 
      : traits_type::to_int_type(*this->gptr()); 
    } 
    int_type overflow(int_type c) 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     if (!traits_type::eq_int_type(c, traits_type::eof())) { 
      *this->pptr() = traits_type::to_char_type(c); 
      this->pbump(1); 
     } 
     return this->internal_sync(lock) 
      ? traits_type::eof() 
      : traits_type::not_eof(c); 
    } 
    int sync() 
    { 
     std::unique_lock<std::mutex> lock(this->d_mutex); 
     return this->internal_sync(lock); 
    } 
    int internal_sync(std::unique_lock<std::mutex>& lock) 
    { 
     char* end(&this->d_tmp[0] + this->d_tmp.size()); 
     while (this->d_current == end && !this->d_closed) { 
      this->d_condition.wait(lock); 
     } 
     if (this->d_current != end) 
     { 
      std::streamsize size(std::min(end - d_current, 
              this->pptr() - this->pbase())); 
      traits_type::copy(d_current, this->pbase(), size); 
      this->d_current += size; 
      std::streamsize remain((this->pptr() - this->pbase()) - size); 
      traits_type::move(this->pbase(), this->pptr(), remain); 
      this->setp(this->pbase(), this->epptr()); 
      this->pbump(remain); 
      this->d_condition.notify_one(); 
      return 0; 
     } 
     return traits_type::eof(); 
    } 
}; 

// ---------------------------------------------------------------------------- 

static void writer(std::ostream& out) 
{ 
    for (std::string line; std::getline(std::cin, line);) 
    { 
     out << "writer: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

static void reader(std::istream& in) 
{ 
    for (std::string line; std::getline(in, line);) 
    { 
     std::cout << "reader: '" << line << "'\n"; 
    } 
} 

// ---------------------------------------------------------------------------- 

int main() 
{ 
    try 
    { 
     threadbuf sbuf; 
     std::ostream out(&sbuf); 
     std::istream in(&sbuf); 

     std::thread write(&::writer, std::ref(out)); 
     std::thread read(&::reader, std::ref(in)); 

     write.join(); 
     sbuf.close(); 
     read.join(); 
    } 
    catch (std::exception const& ex) 
    { 
     std::cerr << "ERROR: " << ex.what() << "\n"; 
    } 
} 
+1

+1; OP當然在尋找更快捷簡單的解決方案。 – Walter

+0

好吧,使用上面的代碼**對於snybody而言是**快而我:)以前我見過類似的請求,也就是說,它可能對其他人有用。 ...這是一個有趣的練習,才能真正實現我之前僅列出的內容。最後:我不知道更容易修復! –

+0

你是一個翻轉的傳說Dietmar。我在一個單元測試中使用它,它工作得很好。謝謝。 – MattSmith