2016-07-06 54 views
0

我正在使用boost asio async_read()從套接字讀取數據。我有讀取器類的以下成員,其通過程序的生命週期仍然存在:在boost :: asio中使用與調整大小的streambuf相同的istream

boost::asio::streambuf recv_data; 
std::istream stream_is(&recv_data); 

的async_read調用看起來是這樣的:

boost::asio::async_read(ep_ptr->get_sock(), recv_data, boost::asio::transfer_exactly(n), 
           boost::bind(&IProtocol::handle_data_recv, &protocol, 
              boost::asio::placeholders::error)); 

我的問題是,如果我讀書會發生什麼'n'個字節,並且streambuf的大小小於'n',所以它調整自己。我是否需要重新創建std :: istream,因爲std :: istream持有的內部streambuf緩衝區現在可以被釋放/釋放?

+1

半相關的,你可能想要做的:boost ::綁定( &IProtocol :: handle_data_recv,protocol.shared_from_this(), boost :: asio :: placeholders :: error)或類似的強制使用壽命,儘管取決於程序的結構,也許你已經暗中強制使用壽命。這當然會迫使你使用shared_ptr,並從enable_shared_from_this <> – M2tM

+0

派生。很好的觀察,謝謝 - 但正如你所提到的,協議對象也存在於程序的整個生命週期中。所有這些都是在調用io_service.run()之前在main()中創建的。 – Sid

回答

1

不,謝天謝地,綁定並不在意recv_data的內部可能被重新分配,而是綁定到recv_data對象本身。下面是我寫的下載程序的一個工作示例,您可以看到,在讀取之間不會重新分配緩衝區。

以同樣的方式,你可以安全地共享一個向量的引用,而不關心向量的內部是否被重新分配(除非你直接開始指向向量元素的內存地址,或者在迭代器變成無效。矢量的句柄保持有效,並且以同樣的方式,streambuf的句柄對istream保持有效並且工作正常)。

download.h

#ifndef _MV_DOWNLOAD_H_ 
#define _MV_DOWNLOAD_H_ 

#include <string> 
#include <iostream> 
#include <istream> 
#include <ostream> 
#include <fstream> 
#include <algorithm> 
#include "Network/url.h" 
#include "Utility/generalUtility.h" 
#include <boost/asio.hpp> 
#include <boost/bind.hpp> 

namespace MV { 
    struct HttpHeader { 
     std::string version; 
     int status = 0; 
     std::string message; 
     std::map<std::string, std::string> values; 

     std::vector<std::string> bounces; 

     bool success = false; 
     std::string errorMessage; 

     size_t contentLength; 

     HttpHeader() { 
     } 

     HttpHeader(std::istream& response_stream) { 
      read(response_stream); 
     } 

     void read(std::istream& response_stream); 
    }; 


    inline std::ostream& operator<<(std::ostream& os, const HttpHeader& obj) { 
     os << "\\/______HTTP_HEADER______\\/\nVersion [" << obj.version << "] Status [" << obj.status << "] Message [" << obj.message << "]\n"; 
     os << "||-----------------------||\n"; 
     for (auto&& kvp : obj.values) { 
      os << "[" << kvp.first << "]: " << kvp.second << "\n"; 
     } 
     os << "\n||--------Bounces--------||\n"; 
     for (size_t i = 0; i < obj.bounces.size(); ++i) { 
      os << i << ": " << obj.bounces[i] << "\n"; 
     } 
     os << "/\\_______________________/\\" << std::endl; 
     return os; 
    } 
    inline std::istream& operator>>(std::istream& a_is, HttpHeader& a_obj) { 
     a_obj.read(a_is); 
     return a_is; 
    } 

    class DownloadRequest : public std::enable_shared_from_this<DownloadRequest> { 
    public: 
     static std::shared_ptr<DownloadRequest> make(const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput) { 
      auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput)); 
      result->perform(a_url); 
      return result; 
     } 

     //onComplete is called on success or error at the end of the download. 
     static std::shared_ptr<DownloadRequest> make(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::shared_ptr<std::ostream> &a_streamOutput, std::function<void (std::shared_ptr<DownloadRequest>)> a_onComplete) { 
      auto result = std::shared_ptr<DownloadRequest>(new DownloadRequest(a_streamOutput)); 
      result->onComplete = a_onComplete; 
      result->ioService = a_ioService; 
      result->perform(a_url); 
      return result; 
     } 

     HttpHeader& header() { 
      return headerData; 
     } 

     MV::Url finalUrl() { 
      return currentUrl; 
     } 

     MV::Url inputUrl() { 
      return originalUrl; 
     } 

    private: 
     DownloadRequest(const std::shared_ptr<std::ostream> &a_streamOutput) : 
      streamOutput(a_streamOutput) { 
     } 

     void perform(const MV::Url& a_url); 

     bool initializeSocket(); 

     void initiateRequest(const MV::Url& a_url); 

     void handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 
     void handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 
     void handleWriteRequest(const boost::system::error_code& err); 
     void handleReadHeaders(const boost::system::error_code& err); 
     void handleReadContent(const boost::system::error_code& err); 

     void readResponseToStream() { 
      (*streamOutput) << &(*response); 
     } 

     std::shared_ptr<boost::asio::io_service> ioService; 
     std::unique_ptr<boost::asio::ip::tcp::resolver> resolver; 
     std::unique_ptr<boost::asio::ip::tcp::socket> socket; 

     std::unique_ptr<std::istream> responseStream; 

     std::unique_ptr<boost::asio::streambuf> request; 
     std::unique_ptr<boost::asio::streambuf> response; 

     std::shared_ptr<std::ostream> streamOutput; 

     HttpHeader headerData; 

     MV::Url currentUrl; 
     MV::Url originalUrl; 

     std::function<void(std::shared_ptr<DownloadRequest>)> onComplete; 
    }; 

    std::string DownloadString(const MV::Url& a_url); 

    HttpHeader DownloadFile(const MV::Url& a_url, const std::string &a_path); 
    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>()); 

    void DownloadFiles(const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>()); 
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete = std::function<void(std::shared_ptr<DownloadRequest>)>(), std::function<void()> a_onAllComplete = std::function<void()>()); 
} 
#endif 

download.cpp

#include "download.h" 
#include <boost/filesystem.hpp> 
#include <atomic> 

namespace MV{ 
    void HttpHeader::read(std::istream& response_stream) { 
     values.clear(); 

     response_stream >> version; 
     std::string status_code; 
     response_stream >> status_code; 
     try { 
      status = std::stoi(status_code); 
     } catch (...) { 
      status = 0; 
     } 

     getline_platform_agnostic(response_stream, message); 

     if (!message.empty() && message[0] == ' ') { message = message.substr(1); } 

     std::string header; 
     while (getline_platform_agnostic(response_stream, header) && !header.empty()) { 
      auto index = header.find_first_of(':'); 
      if (index != std::string::npos && index > 0) { 
       auto key = header.substr(0, index); 
       auto value = (index + 2 >= header.size()) ? "" : header.substr(index + 2); 
       std::transform(key.begin(), key.end(), key.begin(), [](char c) {return std::tolower(c); }); 
       values[key] = value; 
       if (toLower(key) == "content-length") { 
        try { 
         contentLength = static_cast<size_t>(stol(value)); 
        } catch (std::exception &e) { 
         std::cerr << e.what() << std::endl; 
         contentLength = 0; 
        } 
       } 
      } 
     } 
    } 

    std::string DownloadString(const Url& a_url) { 
     auto result = std::make_shared<std::stringstream>(); 
     if (DownloadRequest::make(a_url, result)->header().success) { 
      return result->str(); 
     } else { 
      return ""; 
     } 
    } 

    MV::HttpHeader DownloadFile(const Url& a_url, const std::string &a_path) { 
     HttpHeader header; 
     { 
      boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path()); 
      auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary); 
      auto request = DownloadRequest::make(a_url, outFile); 
      header = request->header(); 
     } 
     if (!header.success) { 
      std::remove(a_path.c_str()); 
     } 
     return header; 
    } 

    void DownloadFile(const std::shared_ptr<boost::asio::io_service> &a_ioService, const MV::Url& a_url, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) { 
     boost::filesystem::create_directories(boost::filesystem::path(a_path).parent_path()); 
     auto outFile = std::make_shared<std::ofstream>(a_path, std::ofstream::out | std::ofstream::binary); 
     auto request = DownloadRequest::make(a_ioService, a_url, outFile, [a_path, a_onComplete](std::shared_ptr<DownloadRequest> a_result) { 
      if (!a_result->header().success) { 
       std::remove(a_path.c_str()); 
      } 
      if (a_onComplete) { a_onComplete(a_result); } 
     }); 
    } 

    void DownloadFiles(const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete) { 
     auto service = std::make_shared<boost::asio::io_service>(); 
     for (auto&& url : a_urls) { 
      DownloadFile(service, url, a_path + boost::filesystem::path(url.path()).filename().string(), a_onComplete); 
     } 
     service->run(); 
    } 
    void DownloadFiles(const std::shared_ptr<boost::asio::io_service> &a_ioService, const std::vector<MV::Url>& a_urls, const std::string &a_path, std::function<void(std::shared_ptr<DownloadRequest>)> a_onComplete, std::function<void()> a_onAllComplete) { 
     size_t totalFiles = a_urls.size(); 
     for (auto&& url : a_urls) { 
      auto counter = std::make_shared<std::atomic<size_t>>(0); 
      DownloadFile(a_ioService, url, a_path + boost::filesystem::path(url.path()).filename().string(), [=](std::shared_ptr<DownloadRequest> a_request) { 
       a_onComplete(a_request); 
       if (++(*counter) == totalFiles) { 
        a_onAllComplete(); 
       } 
      }); 
     } 
    } 

    void DownloadRequest::handleReadContent(const boost::system::error_code& err) { 
     if (!err) { 
      readResponseToStream(); 
      if (onComplete) { onComplete(shared_from_this()); } 
     } else if (err != boost::asio::error::eof) { 
      headerData.success = false; 
      headerData.errorMessage = "Download Read Content Failure: " + err.message(); 
      std::cerr << headerData.errorMessage << std::endl; 
      if (onComplete) { onComplete(shared_from_this()); } 
     } 
    } 

    void DownloadRequest::handleReadHeaders(const boost::system::error_code& err) { 
     if (!err) { 
      responseStream = std::make_unique<std::istream>(&(*response)); 

      headerData.read(*responseStream); 
      headerData.success = true; 
      headerData.errorMessage = ""; 
      if (headerData.status >= 300 && headerData.status < 400 && headerData.bounces.size() < 32 && headerData.values.find("location") != headerData.values.end()) { 
       headerData.bounces.push_back(currentUrl.toString()); 
       initiateRequest(headerData.values["location"]); 
      } else { 
       auto amountLeftToRead = headerData.contentLength - response->size(); 
       if (response->size() > 0) { 
        readResponseToStream(); 
       } 
       if (amountLeftToRead > 0) { 
        boost::asio::async_read(*socket, *response, boost::asio::transfer_at_least(amountLeftToRead), boost::bind(&DownloadRequest::handleReadContent, shared_from_this(), boost::asio::placeholders::error)); 
       } else { 
        if (onComplete) { onComplete(shared_from_this()); } 
       } 
      } 
     } else { 
      headerData.success = false; 
      headerData.errorMessage = "Download Read Header Failure: " + err.message(); 
      std::cerr << headerData.errorMessage << std::endl; 
      if (onComplete) { onComplete(shared_from_this()); } 
     } 
    } 

    void DownloadRequest::handleWriteRequest(const boost::system::error_code& err) { 
     if (!err) { 
      boost::asio::async_read_until(*socket, *response, "\r\n\r\n", boost::bind(&DownloadRequest::handleReadHeaders, shared_from_this(), boost::asio::placeholders::error)); 
     } else { 
      headerData.success = false; 
      headerData.errorMessage = "Download Write Failure: " + err.message(); 
      std::cerr << headerData.errorMessage << std::endl; 
      if (onComplete) { onComplete(shared_from_this()); } 
     } 
    } 

    void DownloadRequest::handleConnect(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { 
     if (!err) { 
      // The connection was successful. Send the request. 
      boost::asio::async_write(*socket, *request, boost::bind(&DownloadRequest::handleWriteRequest, shared_from_this(), boost::asio::placeholders::error)); 
     } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { 
      // The connection failed. Try the next endpoint in the list. 
      socket->close(); 
      boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; 
      socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator)); 
     } else { 
      headerData.success = false; 
      headerData.errorMessage = "Download Connection Failure: " + err.message(); 
      std::cerr << headerData.errorMessage << std::endl; 
      if (onComplete) { onComplete(shared_from_this()); } 
     } 
    } 

    void DownloadRequest::handleResolve(const boost::system::error_code& err, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { 
     if (!err) { 
      // Attempt a connection to the first endpoint in the list. Each endpoint 
      // will be tried until we successfully establish a connection. 
      boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; 
      socket->async_connect(endpoint, boost::bind(&DownloadRequest::handleConnect, shared_from_this(), boost::asio::placeholders::error, ++endpoint_iterator)); 
     } else { 
      headerData.success = false; 
      headerData.errorMessage = "Download Resolve Failure: " + err.message(); 
      std::cerr << headerData.errorMessage << std::endl; 
      if (onComplete) { onComplete(shared_from_this()); } 
     } 
    } 

    void DownloadRequest::initiateRequest(const MV::Url& a_url) { 
     socket->close(); 
     currentUrl = a_url; 
     request = std::make_unique<boost::asio::streambuf>(); 
     response = std::make_unique<boost::asio::streambuf>(); 
     using boost::asio::ip::tcp; 

     std::ostream requestStream(&(*request)); 
     requestStream << "GET " << a_url.pathAndQuery() << " HTTP/1.1\r\n"; 
     requestStream << "Host: " << a_url.host() << "\r\n"; 
     requestStream << "Accept: */*\r\n"; 
     requestStream << "Connection: close\r\n\r\n"; 

     tcp::resolver::query query(a_url.host(), "http"); 
     resolver->async_resolve(query, boost::bind(&DownloadRequest::handleResolve, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::iterator)); 
    } 

    bool DownloadRequest::initializeSocket() { 
     bool created = false; 
     if (!ioService) { 
      ioService = std::make_shared<boost::asio::io_service>(); 
      created = true; 
     } 

     resolver = std::make_unique<boost::asio::ip::tcp::resolver>(*ioService); 
     socket = std::make_unique<boost::asio::ip::tcp::socket>(*ioService); 

     return created; 
    } 

    void DownloadRequest::perform(const MV::Url& a_url) { 
     originalUrl = a_url; 
     try { 
      bool needToCallRun = initializeSocket(); 
      initiateRequest(a_url); 
      if (needToCallRun) { 
       ioService->run(); 
      } 
     } catch (...) { 
      headerData.success = false; 
      headerData.errorMessage = "Exception thrown to top level."; 
      std::cerr << headerData.errorMessage << std::endl; 
      onComplete(shared_from_this()); 
     } 
    } 

} 

generalUtility.h(它的一部分無論如何,只是此參考)

inline std::istream& getline_platform_agnostic(std::istream& is, std::string& t) { 
    t.clear(); 

    // The characters in the stream are read one-by-one using a std::streambuf. 
    // That is faster than reading them one-by-one using the std::istream. 
    // Code that uses streambuf this way must be guarded by a sentry object. 
    // The sentry object performs various tasks, 
    // such as thread synchronization and updating the stream state. 

    std::istream::sentry se(is, true); 
    std::streambuf* sb = is.rdbuf(); 

    for (;;) { 
     int c = sb->sbumpc(); 
     switch (c) { 
     case '\n': 
      return is; 
     case '\r': 
      if (sb->sgetc() == '\n') 
       sb->sbumpc(); 
      return is; 
     case EOF: 
      // Also handle the case when the last line has no line ending 
      if (t.empty()) 
       is.setstate(std::ios::eofbit); 
      return is; 
     default: 
      t += (char)c; 
     } 
    } 
} 

inline std::string toLower(std::string s) { 
    std::transform(s.begin(), s.end(), s.begin(), [](char c) { return std::tolower(c); }); 
    return s; 
} 

url.h

修改(略,只是改變了一些命名方案的東西)

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/include/Poco/URI.h

https://github.com/keyz182/Poco-1.4.3/blob/master/Foundation/src/URI.cpp

相關問題