Mercurial > templog
diff rust/src/params.rs @ 627:d5075136442f rust
futures await
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 06 Feb 2018 22:16:44 +0800 |
parents | efcbe0d3afd6 |
children | 3e5e52d50af5 |
line wrap: on
line diff
--- a/rust/src/params.rs Wed Dec 06 00:09:45 2017 +0800 +++ b/rust/src/params.rs Tue Feb 06 22:16:44 2018 +0800 @@ -1,5 +1,5 @@ extern crate tokio_core; -extern crate futures; +extern crate futures_await as futures; extern crate rand; extern crate serde_json; extern crate base64; @@ -14,6 +14,8 @@ use std::cell::{Cell,RefCell}; use std::fs::File; use std::io::Read; +use futures::prelude::*; + use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; @@ -93,7 +95,7 @@ let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); let mut req = hyper::client::Request::new(hyper::Method::Get, uri); { - let mut headers = req.headers_mut(); + let headers = req.headers_mut(); headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); } hyper::client::Client::new(&self.handle).request(req) @@ -137,57 +139,38 @@ }); } - fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { - let resp = rself.make_request(); + #[async_stream(item = Params)] + pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { + let rcself = Rc::new(ParamWaiter::new(config, handle)); - let s = resp.map_err(|e| TemplogError::new_hyper("response", e)) - .and_then(move |r| { - let status = r.status(); - r.body().concat2() - .map_err(|e| TemplogError::new_hyper("body", e)) - .and_then(move |b| { - rself.handle_response(b, status) - }) - }); - Box::new(s) + let dur = Duration::from_millis(4000); + #[async] + for _ in Interval::new(dur, &rcself.handle).unwrap() { + // fetch params + // TODO - skip if inflight. + let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?; + let status = r.status(); + let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?; + if let Ok(params) = rcself.handle_response(b, status) { + stream_yield!(params); + } + } + Ok(()) } - fn new(config: &Config, handle: &Handle) -> Self { + fn new(config: Config, handle: Handle) -> Self { let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); let epoch = base64::encode(&b); + ParamWaiter { limitlog: NotTooOften::new(LOG_MINUTES*60), last_etag: RefCell::new(String::new()), epoch: epoch, - config: config.clone(), - handle: handle.clone(), + config: config, + handle: handle, } } - pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { - let rcself = Rc::new(ParamWaiter::new(config, handle)); - - let dur = Duration::from_millis(4000); - let i = Interval::new(dur, &rcself.handle).unwrap() - .map_err(|e| TemplogError::new_io("interval failed", e)) - .and_then(move |()| { - Self::step(rcself.clone()) - }); - - // TODO use consume_errors() instead once "impl trait" is stable - //Box::new(consume_errors(i)) - let f = i.then(|r| { - match r { - Ok(v) => Ok(Some(v)), - Err(e) => { - debug!("Params stream error: {}", e); - Ok(None) - } - } - }) - .filter_map(|p| p); - Box::new(f) - } }