changeset 618:2d65a9f0bed3 rust

params returning stream of success
author Matt Johnston <matt@ucc.asn.au>
date Tue, 21 Mar 2017 22:35:58 +0800
parents 87a78343140e
children aecd0a15133c
files rust/src/params.rs
diffstat 1 files changed, 28 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/rust/src/params.rs	Tue Mar 21 19:13:49 2017 +0800
+++ b/rust/src/params.rs	Tue Mar 21 22:35:58 2017 +0800
@@ -118,24 +118,32 @@
         (req, buf)
     }
 
-    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
+    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Params, TemplogError> {
         let text = String::from_utf8_lossy(buf).to_string();
         let resp = req.response_code()?;
         match resp {
             200 => {
                 // new params
                 let r: Response = serde_json::from_str(&text)?;
-                *self.last_etag.borrow_mut() = r.etag;
-                Ok(Some(r.params))
+                let mut le = self.last_etag.borrow_mut();
+                *le = r.etag;
+                if le.split('-').next().unwrap() == &self.epoch {
+                    Ok(r.params)
+                } else {
+                    Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch)))
+                }
             }
-            304 => Ok(None), // unmodified (long polling timeout at the server)
+            304 => {
+                // XXX this isn't really an error
+                Err(TemplogError::new("304 unmodified (long polling timeout at the server)"))
+            },
             _ => {
                 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
             },
         }
     }
 
-    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
+    fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> {
 
         if rself.session.borrow().is_none() {
             *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
@@ -151,12 +159,6 @@
                 rself.handle_response(&mut rq, &result)
             });
         Box::new(s)
-
-            /*
-        let mut p = Params::defaults();
-        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
-        future::ok(Some(p)).boxed()
-        */
     }
 
     fn new(config: &Config, handle: &Handle) -> Self {
@@ -181,10 +183,21 @@
             .map_err(|e| TemplogError::new_io("interval failed", e))
             .and_then(move |()| {
                 Self::step(rcself.clone())
-            })
-            // throw away None params
-            .filter_map(|p| p);
-        Box::new(i)
+            });
+
+        // TODO use consume_errors() instead once "impl trait" is stable
+        //Box::new(consume_errors(i))
+        let f = i.then(|r| {
+            match r {
+                Ok(v) => Ok(Some(v)),
+                Err(e) => {
+                    debug!("Params stream error: {}", e);
+                    Ok(None)
+                }
+            }
+        })
+        .filter_map(|p| p);
+        Box::new(f)
     }
 }