Mercurial > templog
changeset 618:2d65a9f0bed3 rust
params returning stream of success
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 21 Mar 2017 22:35:58 +0800 |
parents | 87a78343140e |
children | aecd0a15133c |
files | rust/src/params.rs |
diffstat | 1 files changed, 28 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/src/params.rs Tue Mar 21 19:13:49 2017 +0800 +++ b/rust/src/params.rs Tue Mar 21 22:35:58 2017 +0800 @@ -118,24 +118,32 @@ (req, buf) } - fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { + fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Params, TemplogError> { let text = String::from_utf8_lossy(buf).to_string(); let resp = req.response_code()?; match resp { 200 => { // new params let r: Response = serde_json::from_str(&text)?; - *self.last_etag.borrow_mut() = r.etag; - Ok(Some(r.params)) + let mut le = self.last_etag.borrow_mut(); + *le = r.etag; + if le.split('-').next().unwrap() == &self.epoch { + Ok(r.params) + } else { + Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) + } } - 304 => Ok(None), // unmodified (long polling timeout at the server) + 304 => { + // XXX this isn't really an error + Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) + }, _ => { Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) }, } } - fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { + fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { if rself.session.borrow().is_none() { *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); @@ -151,12 +159,6 @@ rself.handle_response(&mut rq, &result) }); Box::new(s) - - /* - let mut p = Params::defaults(); - p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); - future::ok(Some(p)).boxed() - */ } fn new(config: &Config, handle: &Handle) -> Self { @@ -181,10 +183,21 @@ .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { Self::step(rcself.clone()) - }) - // throw away None params - .filter_map(|p| p); - Box::new(i) + }); + + // TODO use consume_errors() instead once "impl trait" is stable + //Box::new(consume_errors(i)) + let f = i.then(|r| { + match r { + Ok(v) => Ok(Some(v)), + Err(e) => { + debug!("Params stream error: {}", e); + Ok(None) + } + } + }) + .filter_map(|p| p); + Box::new(f) } }