2017-07-14 76 views
1

我在Iron handler中創建一個客戶端請求。我如何重用Tokio的Core和Hyper的Client?我使用超0.11.0和tokio-core 0.1。在Iron和Hyper中重用hyper :: client和tokio_core

fn get_result(req: &mut Request) -> IronResult<Response> { 
    let mut payload = String::new(); 
    req.body.read_to_string(&mut payload).unwrap(); 

    // can we re-use core and client somehow. Making then global with lazy_static!() does not work. 
    let mut core = tokio_core::reactor::Core::new().unwrap(); 
    let client = Client::new(&core.handle()); 

    let uri = "http://host:port/getResult".parse().unwrap(); 
    let mut req: hyper::Request = hyper::Request::new(hyper::Method::Post, uri); 
    req.headers_mut().set(ContentType::json()); 
    req.headers_mut().set(ContentLength(payload.len() as u64)); 
    req.set_body(payload); 

    let mut results: Vec<RequestFormat> = Vec::new(); 
    let work = client.request(req).and_then(|res| { 
     res.body().for_each(|chunk| { 
      let re: ResultFormat = serde_json::from_slice(&chunk).unwrap(); 
      results.push(re); 
      Ok(()) 
     }) 
    }); 

    Ok(Response::with(
     (iron::status::Ok, serde_json::to_string(&results).unwrap()), 
    )) 
} 

回答

1

我創建了一個包裝客戶端和核心的Downloader類。下面是片段。

use hyper; 
use tokio_core; 
use std::sync::{mpsc}; 
use std::thread; 
use futures::Future; 
use futures::stream::Stream; 
use std::time::Duration; 
use std::io::{self, Write}; 
use time::precise_time_ns; 
use hyper::Client; 

pub struct Downloader { 
    sender : mpsc::Sender<(hyper::Request, mpsc::Sender<hyper::Chunk>)>, 
    #[allow(dead_code)] 
    tr : thread::JoinHandle<hyper::Request>, 
} 
impl Downloader { 
    pub fn new() -> Downloader { 
     let (sender, receiver) = mpsc::channel::<(hyper::Request,mpsc::Sender<hyper::Chunk>)>(); 
     let tr = thread::spawn(move||{ 
      let mut core = tokio_core::reactor::Core::new().unwrap(); 
      let client = Client::new(&core.handle()); 
      loop { 
       let (req , sender) = receiver.recv().unwrap(); 
       let begin = precise_time_ns(); 
       let work = client.request(req) 
       .and_then(|res| { 
        res.body().for_each(|chunk| { 

         sender.send(chunk) 
         .map_err(|e|{ 
          //io::sink().write(&chunk).unwrap(); 
          io::Error::new(io::ErrorKind::Other, e) 
         })?; 
         Ok(()) 
        }) 
        //sender.close(); 
       //res.body().concat2() 
       }); 
      core.run(work).map_err(|e|{println!("Error Is {:?}", e);}); 
      //This time prints same as all request processing time. 
      debug!("Time taken In Download {:?} ms", (precise_time_ns() - begin)/1000000); 
      } 
     }); 
     Downloader{sender, 
       tr, 
     } 
    } 

    pub fn download(&self, req : hyper::Request, results: mpsc::Sender<Vec<u8>>){ 
     self.sender.send((req, results)).unwrap(); 
    } 
} 

現在這個類的客戶端可以有一個靜態變量。

lazy_static!{ 
    static ref DOWNLOADER : Mutex<downloader::Downloader> = 
Mutex::new(downloader::Downloader::new()); 
} 
let (sender, receiver) = mpsc::channel(); 
DOWNLOADER.lock().unwrap().download(payload, sender); 

然後通過接收通道讀取。 可能需要使用sender.drop關閉發件人頻道()