2017-08-16 70 views
0

我試圖通過使用靜態響應實現我自己的hyper::client::Connect來測試一些使用hyper::Client的代碼。我已經找到了類型,但無法找出運行時問題,tokio-proto抱怨說request/response mismatch。下面是我的代碼的簡化版本,展示了失敗:實現hyper :: client :: Connect進行測試

extern crate futures; 
extern crate hyper; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{future, Future, Stream}; 
use std::str::from_utf8; 
use std::io::Cursor; 

struct Client<'a, C: 'a> { 
    client: &'a hyper::Client<C>, 
    url: &'a str, 
} 

impl<'a, C: hyper::client::Connect> Client<'a, C> { 
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> { 
     Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| { 
      let body = Vec::new(); 
      res.body() 
       .fold(body, |mut acc, chunk| { 
        acc.extend_from_slice(chunk.as_ref()); 
        Ok::<_, hyper::Error>(acc) 
       }) 
       .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap()))) 
     })) 
    } 
} 

struct StaticConnector<'a> { 
    body: &'a [u8], 
} 

impl<'a> StaticConnector<'a> { 
    fn new(body: &'a [u8]) -> StaticConnector { 
     StaticConnector { body: body } 
    } 
} 

impl<'a> hyper::server::Service for StaticConnector<'a> { 
    type Request = hyper::Uri; 
    type Response = Cursor<Vec<u8>>; 
    type Error = std::io::Error; 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    fn call(&self, _: Self::Request) -> Self::Future { 
     Box::new(future::ok(Cursor::new(self.body.to_vec()))) 
    } 
} 

fn main() { 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let handle = core.handle(); 

    // My StaticConnector for testing 
    let hyper_client = hyper::Client::configure() 
     .connector(StaticConnector::new(
      b"\ 
       HTTP/1.1 200 OK\r\n\ 
       Content-Length: 8\r\n\ 
       \r\n\ 
       Maldives\ 
       ", 
     )) 
     .build(&handle); 

    // Real Connector 
    /* 
    let hyper_client = hyper::Client::configure().build(&handle); 
    */ 

    let client = Client { 
     client: &hyper_client, 
     url: "http://ifconfig.co/country", 
    }; 
    let result = core.run(client.get()).unwrap(); 
    println!("{}", result); 
} 

Playground

我猜這是我使用CursorIo這是不完整的在某些方面,但我沒能調試並取得進展。一個想法是寫入這Cursorhyper::Client大概使得不能按預期工作。也許我需要寫入的sink和讀取的靜態內容的組合?所有想法我都未能取得進展!

回答

0

原始代碼不起作用的原因是因爲讀者方在客戶端發送請求之前提供了響應,因此tokio-protorequest/response mismatch錯誤。修復結果不是微不足道的,首先我們需要安排讀者阻止,或者更具體地說,用std::io::ErrorKind::WouldBlock向事件循環指出還沒有任何東西,但不認爲它是EOF 。另外,一旦我們得到表明請求已被髮送的寫入,並且tokio-proto機器正在等待響應,我們使用futures::task::current.notify來解鎖讀取。這裏有一個更新的實現,按預期工作:

extern crate futures; 
extern crate hyper; 
extern crate tokio_core; 
extern crate tokio_io; 

use futures::{future, Future, Stream, task, Poll}; 
use std::str::from_utf8; 
use std::io::{self, Cursor, Read, Write}; 
use tokio_io::{AsyncRead, AsyncWrite}; 

struct Client<'a, C: 'a> { 
    client: &'a hyper::Client<C>, 
    url: &'a str, 
} 

impl<'a, C: hyper::client::Connect> Client<'a, C> { 
    fn get(&self) -> Box<Future<Item = String, Error = hyper::Error>> { 
     Box::new(self.client.get(self.url.parse().unwrap()).and_then(|res| { 
      let body = Vec::new(); 
      res.body() 
       .fold(body, |mut acc, chunk| { 
        acc.extend_from_slice(chunk.as_ref()); 
        Ok::<_, hyper::Error>(acc) 
       }) 
       .and_then(move |value| Ok(String::from(from_utf8(&value).unwrap()))) 
     })) 
    } 
} 

struct StaticStream { 
    wrote: bool, 
    body: Cursor<Vec<u8>>, 
} 

impl Read for StaticStream { 
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 
     if self.wrote { 
      self.body.read(buf) 
     } else { 
      Err(io::ErrorKind::WouldBlock.into()) 
     } 
    } 
} 

impl Write for StaticStream { 
    fn write<'a>(&mut self, buf: &'a [u8]) -> io::Result<usize> { 
     self.wrote = true; 
     task::current().notify(); 
     Ok(buf.len()) 
    } 

    fn flush(&mut self) -> io::Result<()> { 
     Ok(()) 
    } 
} 

impl AsyncRead for StaticStream {} 

impl AsyncWrite for StaticStream { 
    fn shutdown(&mut self) -> Poll<(), io::Error> { 
     Ok(().into()) 
    } 
} 

struct StaticConnector<'a> { 
    body: &'a [u8], 
} 

impl<'a> StaticConnector<'a> { 
    fn new(body: &'a [u8]) -> StaticConnector { 
     StaticConnector { body: body } 
    } 
} 

impl<'a> hyper::server::Service for StaticConnector<'a> { 
    type Request = hyper::Uri; 
    type Response = StaticStream; 
    type Error = std::io::Error; 
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>; 

    fn call(&self, _: Self::Request) -> Self::Future { 
     Box::new(future::ok(StaticStream { 
      wrote: false, 
      body: Cursor::new(self.body.to_vec()), 
     })) 
    } 
} 

fn main() { 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let handle = core.handle(); 

    // My StaticConnector for testing 
    let hyper_client = hyper::Client::configure() 
     .connector(StaticConnector::new(
      b"\ 
       HTTP/1.1 200 OK\r\n\ 
       Content-Length: 8\r\n\ 
       \r\n\ 
       Maldives\ 
       ", 
     )) 
     .build(&handle); 

    // Real Connector 
    /* 
    let hyper_client = hyper::Client::configure().build(&handle); 
    */ 

    let client = Client { 
     client: &hyper_client, 
     url: "http://ifconfig.co/country", 
    }; 
    let result = core.run(client.get()).unwrap(); 
    println!("{}", result); 
} 

Playground

注:此實現適用於簡單的情況,但我還沒有測試更復雜的情況。例如,我不確定的是有多大的請求/響應行爲,因爲它們可能涉及多於一個讀/寫呼叫。