changeset 615:f153aec221be rust

move Params, epoch code
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:56:12 +0800
parents e1bab5b36352
children a85c0c9bc1fa
files rust/src/fridge.rs rust/src/main.rs rust/src/params.rs rust/src/paramwaiter.rs rust/src/types.rs
diffstat 5 files changed, 192 insertions(+), 201 deletions(-) [+]
line wrap: on
line diff
--- a/rust/src/fridge.rs	Tue Mar 07 23:04:02 2017 +0800
+++ b/rust/src/fridge.rs	Tue Mar 07 23:56:12 2017 +0800
@@ -11,7 +11,8 @@
 use tokio_core::reactor::{Timeout,Handle};
 use futures::sync::{mpsc};
 
-use ::Config;
+use config::Config;
+use params::Params;
 use types::*;
 
 #[derive(Debug)]
@@ -57,7 +58,7 @@
         let (s, r) = mpsc::channel(1);
         let mut f = Fridge { 
             config: config.clone(),
-            params: p,
+            params: p.clone(),
             temp_wort: None,
             temp_fridge: None,
 
--- a/rust/src/main.rs	Tue Mar 07 23:04:02 2017 +0800
+++ b/rust/src/main.rs	Tue Mar 07 23:56:12 2017 +0800
@@ -31,7 +31,7 @@
 mod sensor;
 mod fridge;
 mod types;
-mod paramwaiter;
+mod params;
 
 use types::*;
 use config::Config;
@@ -41,8 +41,9 @@
     let mut core = Core::new().unwrap();
     let handle = core.handle();
 
-    let mut paramh = ParamHolder::new();
-    let mut fridge = fridge::Fridge::new(&config, nowait, paramh.p, &handle);
+    let params = params::Params::load(&config);
+    let epoch = params.epoch.clone();
+    let mut fridge = fridge::Fridge::new(&config, nowait, params, &handle);
 
     let (fridge_reading_s, fridge_reading_r) = mpsc::channel(1);
     let fridge_reading_r = fridge_reading_r.map_err(|e| TemplogError::new("Problem with fridge_reading_r channel"));
@@ -71,7 +72,7 @@
         r
     });
 
-    let param_stream = paramwaiter::ParamWaiter::stream(config, &handle);
+    let param_stream = params::ParamWaiter::stream(config, epoch, &handle);
     let p = param_stream.map(|p| {
             fridge::Message::Params(p)
         });
@@ -99,7 +100,7 @@
   -d, --debug
   -t, --test    Use fake sensors etc
   --nowait      Skip initial fridge wait
-  --defconf     Print default config (customise in tempserver.conf)
+  --defconf     Print default config (customise in local.conf)
   --thisconf    Print used config
 ";
 
@@ -138,7 +139,7 @@
 
     if args.flag_defconf {
         println!("Default configuration:\n{}\n\n{}",
-            "(custom options go in tempserver.conf)",
+            "(custom options go in local.conf)",
             config::Config::default().to_toml_string());
         std::process::exit(0);
     }
@@ -148,7 +149,7 @@
 fn load_config() -> Config {
     let nconfig = config::Config::default();
 
-    let conf_filename = "tempserver.conf";
+    let conf_filename = "local.conf";
     nconfig.merge_file(conf_filename)
         .unwrap_or_else(|e| {
             println!("Couldn't parse {}: {}", conf_filename, e);
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/src/params.rs	Tue Mar 07 23:56:12 2017 +0800
@@ -0,0 +1,181 @@
+extern crate tokio_core;
+extern crate futures;
+extern crate rand;
+extern crate serde_json;
+
+use std::time::Duration;
+use std::io;
+use std::str;
+use std::rc::Rc;
+use std::sync::{Arc,Mutex};
+use std::error::Error;
+use std::cell::{Cell,RefCell};
+
+use tokio_core::reactor::Interval;
+use tokio_core::reactor::Handle;
+use tokio_curl::Session;
+use futures::{Stream,Future,future};
+use curl::easy::Easy;
+use curl::easy;
+
+use types::*;
+use ::Config;
+
+#[derive(Deserialize, Serialize, Debug, Clone)]
+pub struct Params {
+    pub fridge_setpoint: f32,
+    pub fridge_difference: f32,
+    pub overshoot_delay: u32,
+    pub overshoot_factor: f32,
+    pub disabled: bool,
+    pub nowort: bool,
+    pub fridge_range_lower: f32,
+    pub fridge_range_upper: f32,
+
+    #[serde(skip_serializing)]
+    pub epoch: String,
+}
+
+impl Params {
+    pub fn defaults() -> Params {
+        Params {
+            fridge_setpoint: 16.0,
+            fridge_difference: 0.2,
+            overshoot_delay: 720, // 12 minutes
+            overshoot_factor: 1.0,
+            disabled: false,
+            nowort: false,
+            fridge_range_lower: 3.0,
+            fridge_range_upper: 3.0,
+            epoch: String::new(),
+            }
+    }
+
+    pub fn load(config: &Config) -> Params {
+        // generate random epoch on success.
+        // TODO: return failure? or just return default() ?
+        unimplemented!();
+    }
+}
+
+#[derive(Deserialize, Debug)]
+struct Response {
+    epoch_tag: String,
+    params: Params,
+}
+
+#[derive(Clone)]
+pub struct ParamWaiter {
+    limitlog: NotTooOften,
+    epoch_tag: RefCell<String>,
+    session: RefCell<Option<Session>>,
+
+    config: Config,
+    handle: Handle,
+}
+
+const LOG_MINUTES: u64 = 15;
+const MAX_RESPONSE_SIZE: usize = 10000;
+const TIMEOUT_MINUTES: u64 = 5;
+
+impl ParamWaiter {
+    fn make_request(&self) -> Easy {
+        let mut req = Easy::new();
+        req.get(true).unwrap();
+        // supposedly req.url won't fail, checking is later?
+        req.url(&self.config.SETTINGS_URL).unwrap();
+
+        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
+
+        // http header
+        //   etag: epoch-tag
+        let e = self.epoch_tag.borrow();
+        if !e.is_empty() {
+            let mut list = easy::List::new();
+            let hd = format!("etag: {}", *e);
+            list.append(&hd).unwrap();
+            req.http_headers(list).unwrap();
+        }
+
+        req
+    }
+
+    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
+        let text = String::from_utf8_lossy(buf).to_string();
+        let resp = req.response_code()?;
+        match resp {
+            200 => {
+                // new params
+                let r: Response = serde_json::from_str(&text)?;
+                *self.epoch_tag.borrow_mut() = r.epoch_tag;
+                Ok(Some(r.params))
+            }
+            304 => Ok(None), // unmodified (long polling timeout at the server)
+            _ => {
+                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
+            },
+        }
+    }
+
+    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
+
+        if rself.session.borrow().is_none() {
+            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
+        }
+        let ses = rself.session.borrow().clone().unwrap();
+
+        let mut req = rself.make_request();
+        let buf = Arc::new(Mutex::new(Vec::new()));
+
+        let dst = buf.clone();
+        req.write_function(move |data| {
+            let mut dst = dst.lock().unwrap();
+            dst.extend_from_slice(data);
+            if dst.len() > MAX_RESPONSE_SIZE {
+                debug!("Too large params response from server: {}", dst.len());
+                Ok(0)
+            } else {
+                Ok(data.len())
+            }
+        }).unwrap();
+
+        let s = ses.perform(req)
+            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
+            .and_then(move |mut rq| {
+                let result = buf.lock().unwrap();
+                rself.handle_response(&mut rq, &result)
+            });
+        Box::new(s)
+
+            /*
+        let mut p = Params::defaults();
+        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
+        future::ok(Some(p)).boxed()
+        */
+    }
+
+    fn new(config: &Config, epoch: String, handle: &Handle) -> Self {
+        ParamWaiter {
+            limitlog: NotTooOften::new(LOG_MINUTES*60),
+            epoch_tag: RefCell::new(epoch),
+            session: RefCell::new(None),
+            config: config.clone(),
+            handle: handle.clone(),
+        }
+    }
+
+    pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
+        let rcself = Rc::new(ParamWaiter::new(config, epoch, handle));
+
+        let dur = Duration::from_millis(4000);
+        let i = Interval::new(dur, &rcself.handle).unwrap()
+            .map_err(|e| TemplogError::new_io("interval failed", e))
+            .and_then(move |()| {
+                Self::step(rcself.clone())
+            })
+            // throw away None params
+            .filter_map(|p| p);
+        Box::new(i)
+    }
+}
+
--- a/rust/src/paramwaiter.rs	Tue Mar 07 23:04:02 2017 +0800
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,145 +0,0 @@
-extern crate tokio_core;
-extern crate futures;
-extern crate rand;
-extern crate serde_json;
-
-use std::time::Duration;
-use std::io;
-use std::str;
-use std::rc::Rc;
-use std::sync::{Arc,Mutex};
-use std::error::Error;
-use std::cell::{Cell,RefCell};
-
-use tokio_core::reactor::Interval;
-use tokio_core::reactor::Handle;
-use tokio_curl::Session;
-use futures::{Stream,Future,future};
-use curl::easy::Easy;
-use curl::easy;
-
-use types::*;
-use ::Config;
-
-#[derive(Deserialize, Debug)]
-struct Response {
-    epoch_tag: String,
-    params: Params,
-}
-
-#[derive(Clone)]
-pub struct ParamWaiter {
-    limitlog: NotTooOften,
-    epoch_tag: RefCell<String>,
-    session: RefCell<Option<Session>>,
-
-    config: Config,
-    handle: Handle,
-}
-
-const LOG_MINUTES: u64 = 15;
-const MAX_RESPONSE_SIZE: usize = 10000;
-const TIMEOUT_MINUTES: u64 = 5;
-
-impl ParamWaiter {
-    fn make_request(&self) -> Easy {
-        let mut req = Easy::new();
-        req.get(true).unwrap();
-        // supposedly req.url won't fail, checking is later?
-        req.url(&self.config.SETTINGS_URL).unwrap();
-
-        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
-
-        // http header
-        //   etag: epoch-tag
-        let e = self.epoch_tag.borrow();
-        if !e.is_empty() {
-            let mut list = easy::List::new();
-            let hd = format!("etag: {}", *e);
-            list.append(&hd).unwrap();
-            req.http_headers(list).unwrap();
-        }
-
-        req
-    }
-
-    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
-        let text = String::from_utf8_lossy(buf).to_string();
-        let resp = req.response_code()?;
-        match resp {
-            200 => {
-                // new params
-                let r: Response = serde_json::from_str(&text)?;
-                *self.epoch_tag.borrow_mut() = r.epoch_tag;
-                Ok(Some(r.params))
-            }
-            304 => Ok(None), // unmodified (long polling timeout at the server)
-            _ => {
-                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
-            },
-        }
-    }
-
-    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
-
-        if rself.session.borrow().is_none() {
-            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
-        }
-        let ses = rself.session.borrow().clone().unwrap();
-
-        let mut req = rself.make_request();
-        let buf = Arc::new(Mutex::new(Vec::new()));
-
-        let dst = buf.clone();
-        req.write_function(move |data| {
-            let mut dst = dst.lock().unwrap();
-            dst.extend_from_slice(data);
-            if dst.len() > MAX_RESPONSE_SIZE {
-                debug!("Too large params response from server: {}", dst.len());
-                Ok(0)
-            } else {
-                Ok(data.len())
-            }
-        }).unwrap();
-
-        let s = ses.perform(req)
-            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
-            .and_then(move |mut rq| {
-                let result = buf.lock().unwrap();
-                rself.handle_response(&mut rq, &result)
-            });
-        Box::new(s)
-
-            /*
-        let mut p = Params::defaults();
-        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
-        future::ok(Some(p)).boxed()
-        */
-    }
-
-    pub fn new(config: &Config, handle: &Handle) -> Self {
-        ParamWaiter {
-            limitlog: NotTooOften::new(LOG_MINUTES*60),
-            epoch_tag: RefCell::new(String::new()),
-            session: RefCell::new(None),
-            config: config.clone(),
-            handle: handle.clone(),
-        }
-    }
-
-
-    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
-        let rcself = Rc::new(ParamWaiter::new(config, handle));
-
-        let dur = Duration::from_millis(4000);
-        let i = Interval::new(dur, &rcself.handle).unwrap()
-            .map_err(|e| TemplogError::new_io("interval failed", e))
-            .and_then(move |()| {
-                Self::step(rcself.clone())
-            })
-            // throw away None params
-            .filter_map(|p| p);
-        Box::new(i)
-    }
-}
-
--- a/rust/src/types.rs	Tue Mar 07 23:04:02 2017 +0800
+++ b/rust/src/types.rs	Tue Mar 07 23:56:12 2017 +0800
@@ -14,53 +14,6 @@
 use curl;
 use serde_json;
 
-#[derive(Deserialize, Serialize, Debug)]
-pub struct Params {
-    pub fridge_setpoint: f32,
-    pub fridge_difference: f32,
-    pub overshoot_delay: u32,
-    pub overshoot_factor: f32,
-    pub disabled: bool,
-    pub nowort: bool,
-    pub fridge_range_lower: f32,
-    pub fridge_range_upper: f32,
-}
-
-impl Params {
-    pub fn defaults() -> Params {
-        Params {
-            fridge_setpoint: 16.0,
-            fridge_difference: 0.2,
-            overshoot_delay: 720, // 12 minutes
-            overshoot_factor: 1.0,
-            disabled: false,
-            nowort: false,
-            fridge_range_lower: 3.0,
-            fridge_range_upper: 3.0,
-            }
-    }
-}
-
-#[derive(Debug)]
-pub struct ParamHolder {
-    pub p: Params,
-    epoch: String,
-}
-
-impl ParamHolder {
-    pub fn new() -> ParamHolder {
-        ParamHolder {
-            p: Params::defaults(),
-            epoch: String::new(),
-        }
-    }
-
-    pub fn receive(&mut self, p: &Params, epoch: &String)
-    {
-
-    }
-}
-
 #[derive(Debug)]
 pub struct Readings {
     pub temps: HashMap<String, f32>,