Mercurial > templog
diff rust/src/params.rs @ 615:f153aec221be rust
move Params, epoch code
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 07 Mar 2017 23:56:12 +0800 |
parents | rust/src/paramwaiter.rs@e1bab5b36352 |
children | a85c0c9bc1fa |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/src/params.rs Tue Mar 07 23:56:12 2017 +0800 @@ -0,0 +1,181 @@ +extern crate tokio_core; +extern crate futures; +extern crate rand; +extern crate serde_json; + +use std::time::Duration; +use std::io; +use std::str; +use std::rc::Rc; +use std::sync::{Arc,Mutex}; +use std::error::Error; +use std::cell::{Cell,RefCell}; + +use tokio_core::reactor::Interval; +use tokio_core::reactor::Handle; +use tokio_curl::Session; +use futures::{Stream,Future,future}; +use curl::easy::Easy; +use curl::easy; + +use types::*; +use ::Config; + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct Params { + pub fridge_setpoint: f32, + pub fridge_difference: f32, + pub overshoot_delay: u32, + pub overshoot_factor: f32, + pub disabled: bool, + pub nowort: bool, + pub fridge_range_lower: f32, + pub fridge_range_upper: f32, + + #[serde(skip_serializing)] + pub epoch: String, +} + +impl Params { + pub fn defaults() -> Params { + Params { + fridge_setpoint: 16.0, + fridge_difference: 0.2, + overshoot_delay: 720, // 12 minutes + overshoot_factor: 1.0, + disabled: false, + nowort: false, + fridge_range_lower: 3.0, + fridge_range_upper: 3.0, + epoch: String::new(), + } + } + + pub fn load(config: &Config) -> Params { + // generate random epoch on success. + // TODO: return failure? or just return default() ? + unimplemented!(); + } +} + +#[derive(Deserialize, Debug)] +struct Response { + epoch_tag: String, + params: Params, +} + +#[derive(Clone)] +pub struct ParamWaiter { + limitlog: NotTooOften, + epoch_tag: RefCell<String>, + session: RefCell<Option<Session>>, + + config: Config, + handle: Handle, +} + +const LOG_MINUTES: u64 = 15; +const MAX_RESPONSE_SIZE: usize = 10000; +const TIMEOUT_MINUTES: u64 = 5; + +impl ParamWaiter { + fn make_request(&self) -> Easy { + let mut req = Easy::new(); + req.get(true).unwrap(); + // supposedly req.url won't fail, checking is later? + req.url(&self.config.SETTINGS_URL).unwrap(); + + req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); + + // http header + // etag: epoch-tag + let e = self.epoch_tag.borrow(); + if !e.is_empty() { + let mut list = easy::List::new(); + let hd = format!("etag: {}", *e); + list.append(&hd).unwrap(); + req.http_headers(list).unwrap(); + } + + req + } + + fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { + let text = String::from_utf8_lossy(buf).to_string(); + let resp = req.response_code()?; + match resp { + 200 => { + // new params + let r: Response = serde_json::from_str(&text)?; + *self.epoch_tag.borrow_mut() = r.epoch_tag; + Ok(Some(r.params)) + } + 304 => Ok(None), // unmodified (long polling timeout at the server) + _ => { + Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) + }, + } + } + + fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { + + if rself.session.borrow().is_none() { + *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); + } + let ses = rself.session.borrow().clone().unwrap(); + + let mut req = rself.make_request(); + let buf = Arc::new(Mutex::new(Vec::new())); + + let dst = buf.clone(); + req.write_function(move |data| { + let mut dst = dst.lock().unwrap(); + dst.extend_from_slice(data); + if dst.len() > MAX_RESPONSE_SIZE { + debug!("Too large params response from server: {}", dst.len()); + Ok(0) + } else { + Ok(data.len()) + } + }).unwrap(); + + let s = ses.perform(req) + .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) + .and_then(move |mut rq| { + let result = buf.lock().unwrap(); + rself.handle_response(&mut rq, &result) + }); + Box::new(s) + + /* + let mut p = Params::defaults(); + p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); + future::ok(Some(p)).boxed() + */ + } + + fn new(config: &Config, epoch: String, handle: &Handle) -> Self { + ParamWaiter { + limitlog: NotTooOften::new(LOG_MINUTES*60), + epoch_tag: RefCell::new(epoch), + session: RefCell::new(None), + config: config.clone(), + handle: handle.clone(), + } + } + + pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { + let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); + + let dur = Duration::from_millis(4000); + let i = Interval::new(dur, &rcself.handle).unwrap() + .map_err(|e| TemplogError::new_io("interval failed", e)) + .and_then(move |()| { + Self::step(rcself.clone()) + }) + // throw away None params + .filter_map(|p| p); + Box::new(i) + } +} +