diff rust/src/params.rs @ 615:f153aec221be rust

move Params, epoch code
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:56:12 +0800
parents rust/src/paramwaiter.rs@e1bab5b36352
children a85c0c9bc1fa
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/src/params.rs	Tue Mar 07 23:56:12 2017 +0800
@@ -0,0 +1,181 @@
+extern crate tokio_core;
+extern crate futures;
+extern crate rand;
+extern crate serde_json;
+
+use std::time::Duration;
+use std::io;
+use std::str;
+use std::rc::Rc;
+use std::sync::{Arc,Mutex};
+use std::error::Error;
+use std::cell::{Cell,RefCell};
+
+use tokio_core::reactor::Interval;
+use tokio_core::reactor::Handle;
+use tokio_curl::Session;
+use futures::{Stream,Future,future};
+use curl::easy::Easy;
+use curl::easy;
+
+use types::*;
+use ::Config;
+
+#[derive(Deserialize, Serialize, Debug, Clone)]
+pub struct Params {
+    pub fridge_setpoint: f32,
+    pub fridge_difference: f32,
+    pub overshoot_delay: u32,
+    pub overshoot_factor: f32,
+    pub disabled: bool,
+    pub nowort: bool,
+    pub fridge_range_lower: f32,
+    pub fridge_range_upper: f32,
+
+    #[serde(skip_serializing)]
+    pub epoch: String,
+}
+
+impl Params {
+    pub fn defaults() -> Params {
+        Params {
+            fridge_setpoint: 16.0,
+            fridge_difference: 0.2,
+            overshoot_delay: 720, // 12 minutes
+            overshoot_factor: 1.0,
+            disabled: false,
+            nowort: false,
+            fridge_range_lower: 3.0,
+            fridge_range_upper: 3.0,
+            epoch: String::new(),
+            }
+    }
+
+    pub fn load(config: &Config) -> Params {
+        // generate random epoch on success.
+        // TODO: return failure? or just return default() ?
+        unimplemented!();
+    }
+}
+
+#[derive(Deserialize, Debug)]
+struct Response {
+    epoch_tag: String,
+    params: Params,
+}
+
+#[derive(Clone)]
+pub struct ParamWaiter {
+    limitlog: NotTooOften,
+    epoch_tag: RefCell<String>,
+    session: RefCell<Option<Session>>,
+
+    config: Config,
+    handle: Handle,
+}
+
+const LOG_MINUTES: u64 = 15;
+const MAX_RESPONSE_SIZE: usize = 10000;
+const TIMEOUT_MINUTES: u64 = 5;
+
+impl ParamWaiter {
+    fn make_request(&self) -> Easy {
+        let mut req = Easy::new();
+        req.get(true).unwrap();
+        // supposedly req.url won't fail, checking is later?
+        req.url(&self.config.SETTINGS_URL).unwrap();
+
+        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
+
+        // http header
+        //   etag: epoch-tag
+        let e = self.epoch_tag.borrow();
+        if !e.is_empty() {
+            let mut list = easy::List::new();
+            let hd = format!("etag: {}", *e);
+            list.append(&hd).unwrap();
+            req.http_headers(list).unwrap();
+        }
+
+        req
+    }
+
+    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
+        let text = String::from_utf8_lossy(buf).to_string();
+        let resp = req.response_code()?;
+        match resp {
+            200 => {
+                // new params
+                let r: Response = serde_json::from_str(&text)?;
+                *self.epoch_tag.borrow_mut() = r.epoch_tag;
+                Ok(Some(r.params))
+            }
+            304 => Ok(None), // unmodified (long polling timeout at the server)
+            _ => {
+                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
+            },
+        }
+    }
+
+    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
+
+        if rself.session.borrow().is_none() {
+            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
+        }
+        let ses = rself.session.borrow().clone().unwrap();
+
+        let mut req = rself.make_request();
+        let buf = Arc::new(Mutex::new(Vec::new()));
+
+        let dst = buf.clone();
+        req.write_function(move |data| {
+            let mut dst = dst.lock().unwrap();
+            dst.extend_from_slice(data);
+            if dst.len() > MAX_RESPONSE_SIZE {
+                debug!("Too large params response from server: {}", dst.len());
+                Ok(0)
+            } else {
+                Ok(data.len())
+            }
+        }).unwrap();
+
+        let s = ses.perform(req)
+            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
+            .and_then(move |mut rq| {
+                let result = buf.lock().unwrap();
+                rself.handle_response(&mut rq, &result)
+            });
+        Box::new(s)
+
+            /*
+        let mut p = Params::defaults();
+        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
+        future::ok(Some(p)).boxed()
+        */
+    }
+
+    fn new(config: &Config, epoch: String, handle: &Handle) -> Self {
+        ParamWaiter {
+            limitlog: NotTooOften::new(LOG_MINUTES*60),
+            epoch_tag: RefCell::new(epoch),
+            session: RefCell::new(None),
+            config: config.clone(),
+            handle: handle.clone(),
+        }
+    }
+
+    pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
+        let rcself = Rc::new(ParamWaiter::new(config, epoch, handle));
+
+        let dur = Duration::from_millis(4000);
+        let i = Interval::new(dur, &rcself.handle).unwrap()
+            .map_err(|e| TemplogError::new_io("interval failed", e))
+            .and_then(move |()| {
+                Self::step(rcself.clone())
+            })
+            // throw away None params
+            .filter_map(|p| p);
+        Box::new(i)
+    }
+}
+