# HG changeset patch # User Matt Johnston # Date 1488902172 -28800 # Node ID f153aec221be06cba0bebf18211f255224f5f5ec # Parent e1bab5b36352eaddb8ef0545097af513f74695be move Params, epoch code diff -r e1bab5b36352 -r f153aec221be rust/src/fridge.rs --- a/rust/src/fridge.rs Tue Mar 07 23:04:02 2017 +0800 +++ b/rust/src/fridge.rs Tue Mar 07 23:56:12 2017 +0800 @@ -11,7 +11,8 @@ use tokio_core::reactor::{Timeout,Handle}; use futures::sync::{mpsc}; -use ::Config; +use config::Config; +use params::Params; use types::*; #[derive(Debug)] @@ -57,7 +58,7 @@ let (s, r) = mpsc::channel(1); let mut f = Fridge { config: config.clone(), - params: p, + params: p.clone(), temp_wort: None, temp_fridge: None, diff -r e1bab5b36352 -r f153aec221be rust/src/main.rs --- a/rust/src/main.rs Tue Mar 07 23:04:02 2017 +0800 +++ b/rust/src/main.rs Tue Mar 07 23:56:12 2017 +0800 @@ -31,7 +31,7 @@ mod sensor; mod fridge; mod types; -mod paramwaiter; +mod params; use types::*; use config::Config; @@ -41,8 +41,9 @@ let mut core = Core::new().unwrap(); let handle = core.handle(); - let mut paramh = ParamHolder::new(); - let mut fridge = fridge::Fridge::new(&config, nowait, paramh.p, &handle); + let params = params::Params::load(&config); + let epoch = params.epoch.clone(); + let mut fridge = fridge::Fridge::new(&config, nowait, params, &handle); let (fridge_reading_s, fridge_reading_r) = mpsc::channel(1); let fridge_reading_r = fridge_reading_r.map_err(|e| TemplogError::new("Problem with fridge_reading_r channel")); @@ -71,7 +72,7 @@ r }); - let param_stream = paramwaiter::ParamWaiter::stream(config, &handle); + let param_stream = params::ParamWaiter::stream(config, epoch, &handle); let p = param_stream.map(|p| { fridge::Message::Params(p) }); @@ -99,7 +100,7 @@ -d, --debug -t, --test Use fake sensors etc --nowait Skip initial fridge wait - --defconf Print default config (customise in tempserver.conf) + --defconf Print default config (customise in local.conf) --thisconf Print used config "; @@ -138,7 +139,7 @@ if args.flag_defconf { println!("Default configuration:\n{}\n\n{}", - "(custom options go in tempserver.conf)", + "(custom options go in local.conf)", config::Config::default().to_toml_string()); std::process::exit(0); } @@ -148,7 +149,7 @@ fn load_config() -> Config { let nconfig = config::Config::default(); - let conf_filename = "tempserver.conf"; + let conf_filename = "local.conf"; nconfig.merge_file(conf_filename) .unwrap_or_else(|e| { println!("Couldn't parse {}: {}", conf_filename, e); diff -r e1bab5b36352 -r f153aec221be rust/src/params.rs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/src/params.rs Tue Mar 07 23:56:12 2017 +0800 @@ -0,0 +1,181 @@ +extern crate tokio_core; +extern crate futures; +extern crate rand; +extern crate serde_json; + +use std::time::Duration; +use std::io; +use std::str; +use std::rc::Rc; +use std::sync::{Arc,Mutex}; +use std::error::Error; +use std::cell::{Cell,RefCell}; + +use tokio_core::reactor::Interval; +use tokio_core::reactor::Handle; +use tokio_curl::Session; +use futures::{Stream,Future,future}; +use curl::easy::Easy; +use curl::easy; + +use types::*; +use ::Config; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct Params { + pub fridge_setpoint: f32, + pub fridge_difference: f32, + pub overshoot_delay: u32, + pub overshoot_factor: f32, + pub disabled: bool, + pub nowort: bool, + pub fridge_range_lower: f32, + pub fridge_range_upper: f32, + + #[serde(skip_serializing)] + pub epoch: String, +} + +impl Params { + pub fn defaults() -> Params { + Params { + fridge_setpoint: 16.0, + fridge_difference: 0.2, + overshoot_delay: 720, // 12 minutes + overshoot_factor: 1.0, + disabled: false, + nowort: false, + fridge_range_lower: 3.0, + fridge_range_upper: 3.0, + epoch: String::new(), + } + } + + pub fn load(config: &Config) -> Params { + // generate random epoch on success. + // TODO: return failure? or just return default() ? + unimplemented!(); + } +} + +#[derive(Deserialize, Debug)] +struct Response { + epoch_tag: String, + params: Params, +} + +#[derive(Clone)] +pub struct ParamWaiter { + limitlog: NotTooOften, + epoch_tag: RefCell, + session: RefCell>, + + config: Config, + handle: Handle, +} + +const LOG_MINUTES: u64 = 15; +const MAX_RESPONSE_SIZE: usize = 10000; +const TIMEOUT_MINUTES: u64 = 5; + +impl ParamWaiter { + fn make_request(&self) -> Easy { + let mut req = Easy::new(); + req.get(true).unwrap(); + // supposedly req.url won't fail, checking is later? + req.url(&self.config.SETTINGS_URL).unwrap(); + + req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); + + // http header + // etag: epoch-tag + let e = self.epoch_tag.borrow(); + if !e.is_empty() { + let mut list = easy::List::new(); + let hd = format!("etag: {}", *e); + list.append(&hd).unwrap(); + req.http_headers(list).unwrap(); + } + + req + } + + fn handle_response(&self, req: &mut Easy, buf: &Vec) -> Result, TemplogError> { + let text = String::from_utf8_lossy(buf).to_string(); + let resp = req.response_code()?; + match resp { + 200 => { + // new params + let r: Response = serde_json::from_str(&text)?; + *self.epoch_tag.borrow_mut() = r.epoch_tag; + Ok(Some(r.params)) + } + 304 => Ok(None), // unmodified (long polling timeout at the server) + _ => { + Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) + }, + } + } + + fn step(rself: Rc) -> Box, Error=TemplogError>> { + + if rself.session.borrow().is_none() { + *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); + } + let ses = rself.session.borrow().clone().unwrap(); + + let mut req = rself.make_request(); + let buf = Arc::new(Mutex::new(Vec::new())); + + let dst = buf.clone(); + req.write_function(move |data| { + let mut dst = dst.lock().unwrap(); + dst.extend_from_slice(data); + if dst.len() > MAX_RESPONSE_SIZE { + debug!("Too large params response from server: {}", dst.len()); + Ok(0) + } else { + Ok(data.len()) + } + }).unwrap(); + + let s = ses.perform(req) + .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) + .and_then(move |mut rq| { + let result = buf.lock().unwrap(); + rself.handle_response(&mut rq, &result) + }); + Box::new(s) + + /* + let mut p = Params::defaults(); + p.fridge_setpoint = 17.0 + 4.0*rand::random::(); + future::ok(Some(p)).boxed() + */ + } + + fn new(config: &Config, epoch: String, handle: &Handle) -> Self { + ParamWaiter { + limitlog: NotTooOften::new(LOG_MINUTES*60), + epoch_tag: RefCell::new(epoch), + session: RefCell::new(None), + config: config.clone(), + handle: handle.clone(), + } + } + + pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box> { + let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); + + let dur = Duration::from_millis(4000); + let i = Interval::new(dur, &rcself.handle).unwrap() + .map_err(|e| TemplogError::new_io("interval failed", e)) + .and_then(move |()| { + Self::step(rcself.clone()) + }) + // throw away None params + .filter_map(|p| p); + Box::new(i) + } +} + diff -r e1bab5b36352 -r f153aec221be rust/src/paramwaiter.rs --- a/rust/src/paramwaiter.rs Tue Mar 07 23:04:02 2017 +0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,145 +0,0 @@ -extern crate tokio_core; -extern crate futures; -extern crate rand; -extern crate serde_json; - -use std::time::Duration; -use std::io; -use std::str; -use std::rc::Rc; -use std::sync::{Arc,Mutex}; -use std::error::Error; -use std::cell::{Cell,RefCell}; - -use tokio_core::reactor::Interval; -use tokio_core::reactor::Handle; -use tokio_curl::Session; -use futures::{Stream,Future,future}; -use curl::easy::Easy; -use curl::easy; - -use types::*; -use ::Config; - -#[derive(Deserialize, Debug)] -struct Response { - epoch_tag: String, - params: Params, -} - -#[derive(Clone)] -pub struct ParamWaiter { - limitlog: NotTooOften, - epoch_tag: RefCell, - session: RefCell>, - - config: Config, - handle: Handle, -} - -const LOG_MINUTES: u64 = 15; -const MAX_RESPONSE_SIZE: usize = 10000; -const TIMEOUT_MINUTES: u64 = 5; - -impl ParamWaiter { - fn make_request(&self) -> Easy { - let mut req = Easy::new(); - req.get(true).unwrap(); - // supposedly req.url won't fail, checking is later? - req.url(&self.config.SETTINGS_URL).unwrap(); - - req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); - - // http header - // etag: epoch-tag - let e = self.epoch_tag.borrow(); - if !e.is_empty() { - let mut list = easy::List::new(); - let hd = format!("etag: {}", *e); - list.append(&hd).unwrap(); - req.http_headers(list).unwrap(); - } - - req - } - - fn handle_response(&self, req: &mut Easy, buf: &Vec) -> Result, TemplogError> { - let text = String::from_utf8_lossy(buf).to_string(); - let resp = req.response_code()?; - match resp { - 200 => { - // new params - let r: Response = serde_json::from_str(&text)?; - *self.epoch_tag.borrow_mut() = r.epoch_tag; - Ok(Some(r.params)) - } - 304 => Ok(None), // unmodified (long polling timeout at the server) - _ => { - Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) - }, - } - } - - fn step(rself: Rc) -> Box, Error=TemplogError>> { - - if rself.session.borrow().is_none() { - *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); - } - let ses = rself.session.borrow().clone().unwrap(); - - let mut req = rself.make_request(); - let buf = Arc::new(Mutex::new(Vec::new())); - - let dst = buf.clone(); - req.write_function(move |data| { - let mut dst = dst.lock().unwrap(); - dst.extend_from_slice(data); - if dst.len() > MAX_RESPONSE_SIZE { - debug!("Too large params response from server: {}", dst.len()); - Ok(0) - } else { - Ok(data.len()) - } - }).unwrap(); - - let s = ses.perform(req) - .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) - .and_then(move |mut rq| { - let result = buf.lock().unwrap(); - rself.handle_response(&mut rq, &result) - }); - Box::new(s) - - /* - let mut p = Params::defaults(); - p.fridge_setpoint = 17.0 + 4.0*rand::random::(); - future::ok(Some(p)).boxed() - */ - } - - pub fn new(config: &Config, handle: &Handle) -> Self { - ParamWaiter { - limitlog: NotTooOften::new(LOG_MINUTES*60), - epoch_tag: RefCell::new(String::new()), - session: RefCell::new(None), - config: config.clone(), - handle: handle.clone(), - } - } - - - pub fn stream(config: &Config, handle: &Handle) -> Box> { - let rcself = Rc::new(ParamWaiter::new(config, handle)); - - let dur = Duration::from_millis(4000); - let i = Interval::new(dur, &rcself.handle).unwrap() - .map_err(|e| TemplogError::new_io("interval failed", e)) - .and_then(move |()| { - Self::step(rcself.clone()) - }) - // throw away None params - .filter_map(|p| p); - Box::new(i) - } -} - diff -r e1bab5b36352 -r f153aec221be rust/src/types.rs --- a/rust/src/types.rs Tue Mar 07 23:04:02 2017 +0800 +++ b/rust/src/types.rs Tue Mar 07 23:56:12 2017 +0800 @@ -14,53 +14,6 @@ use curl; use serde_json; -#[derive(Deserialize, Serialize, Debug)] -pub struct Params { - pub fridge_setpoint: f32, - pub fridge_difference: f32, - pub overshoot_delay: u32, - pub overshoot_factor: f32, - pub disabled: bool, - pub nowort: bool, - pub fridge_range_lower: f32, - pub fridge_range_upper: f32, -} - -impl Params { - pub fn defaults() -> Params { - Params { - fridge_setpoint: 16.0, - fridge_difference: 0.2, - overshoot_delay: 720, // 12 minutes - overshoot_factor: 1.0, - disabled: false, - nowort: false, - fridge_range_lower: 3.0, - fridge_range_upper: 3.0, - } - } -} - -#[derive(Debug)] -pub struct ParamHolder { - pub p: Params, - epoch: String, -} - -impl ParamHolder { - pub fn new() -> ParamHolder { - ParamHolder { - p: Params::defaults(), - epoch: String::new(), - } - } - - pub fn receive(&mut self, p: &Params, epoch: &String) - { - - } -} - #[derive(Debug)] pub struct Readings { pub temps: HashMap,