# HG changeset patch # User Matt Johnston # Date 1490106958 -28800 # Node ID 2d65a9f0bed33e1d27cea944d4f22ad4a5c3062d # Parent 87a78343140e0d229163250c45af52d5903f2b63 params returning stream of success diff -r 87a78343140e -r 2d65a9f0bed3 rust/src/params.rs --- a/rust/src/params.rs Tue Mar 21 19:13:49 2017 +0800 +++ b/rust/src/params.rs Tue Mar 21 22:35:58 2017 +0800 @@ -118,24 +118,32 @@ (req, buf) } - fn handle_response(&self, req: &mut Easy, buf: &Vec) -> Result, TemplogError> { + fn handle_response(&self, req: &mut Easy, buf: &Vec) -> Result { let text = String::from_utf8_lossy(buf).to_string(); let resp = req.response_code()?; match resp { 200 => { // new params let r: Response = serde_json::from_str(&text)?; - *self.last_etag.borrow_mut() = r.etag; - Ok(Some(r.params)) + let mut le = self.last_etag.borrow_mut(); + *le = r.etag; + if le.split('-').next().unwrap() == &self.epoch { + Ok(r.params) + } else { + Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) + } } - 304 => Ok(None), // unmodified (long polling timeout at the server) + 304 => { + // XXX this isn't really an error + Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) + }, _ => { Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) }, } } - fn step(rself: Rc) -> Box, Error=TemplogError>> { + fn step(rself: Rc) -> Box> { if rself.session.borrow().is_none() { *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); @@ -151,12 +159,6 @@ rself.handle_response(&mut rq, &result) }); Box::new(s) - - /* - let mut p = Params::defaults(); - p.fridge_setpoint = 17.0 + 4.0*rand::random::(); - future::ok(Some(p)).boxed() - */ } fn new(config: &Config, handle: &Handle) -> Self { @@ -181,10 +183,21 @@ .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { Self::step(rcself.clone()) - }) - // throw away None params - .filter_map(|p| p); - Box::new(i) + }); + + // TODO use consume_errors() instead once "impl trait" is stable + //Box::new(consume_errors(i)) + let f = i.then(|r| { + match r { + Ok(v) => Ok(Some(v)), + Err(e) => { + debug!("Params stream error: {}", e); + Ok(None) + } + } + }) + .filter_map(|p| p); + Box::new(f) } }