view rust/src/params.rs @ 615:f153aec221be rust

move Params, epoch code
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:56:12 +0800
parents rust/src/paramwaiter.rs@e1bab5b36352
children a85c0c9bc1fa
line wrap: on
line source

extern crate tokio_core;
extern crate futures;
extern crate rand;
extern crate serde_json;

use std::time::Duration;
use std::io;
use std::str;
use std::rc::Rc;
use std::sync::{Arc,Mutex};
use std::error::Error;
use std::cell::{Cell,RefCell};

use tokio_core::reactor::Interval;
use tokio_core::reactor::Handle;
use tokio_curl::Session;
use futures::{Stream,Future,future};
use curl::easy::Easy;
use curl::easy;

use types::*;
use ::Config;

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Params {
    pub fridge_setpoint: f32,
    pub fridge_difference: f32,
    pub overshoot_delay: u32,
    pub overshoot_factor: f32,
    pub disabled: bool,
    pub nowort: bool,
    pub fridge_range_lower: f32,
    pub fridge_range_upper: f32,

    #[serde(skip_serializing)]
    pub epoch: String,
}

impl Params {
    pub fn defaults() -> Params {
        Params {
            fridge_setpoint: 16.0,
            fridge_difference: 0.2,
            overshoot_delay: 720, // 12 minutes
            overshoot_factor: 1.0,
            disabled: false,
            nowort: false,
            fridge_range_lower: 3.0,
            fridge_range_upper: 3.0,
            epoch: String::new(),
            }
    }

    pub fn load(config: &Config) -> Params {
        // generate random epoch on success.
        // TODO: return failure? or just return default() ?
        unimplemented!();
    }
}

#[derive(Deserialize, Debug)]
struct Response {
    epoch_tag: String,
    params: Params,
}

#[derive(Clone)]
pub struct ParamWaiter {
    limitlog: NotTooOften,
    epoch_tag: RefCell<String>,
    session: RefCell<Option<Session>>,

    config: Config,
    handle: Handle,
}

const LOG_MINUTES: u64 = 15;
const MAX_RESPONSE_SIZE: usize = 10000;
const TIMEOUT_MINUTES: u64 = 5;

impl ParamWaiter {
    fn make_request(&self) -> Easy {
        let mut req = Easy::new();
        req.get(true).unwrap();
        // supposedly req.url won't fail, checking is later?
        req.url(&self.config.SETTINGS_URL).unwrap();

        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();

        // http header
        //   etag: epoch-tag
        let e = self.epoch_tag.borrow();
        if !e.is_empty() {
            let mut list = easy::List::new();
            let hd = format!("etag: {}", *e);
            list.append(&hd).unwrap();
            req.http_headers(list).unwrap();
        }

        req
    }

    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
        let text = String::from_utf8_lossy(buf).to_string();
        let resp = req.response_code()?;
        match resp {
            200 => {
                // new params
                let r: Response = serde_json::from_str(&text)?;
                *self.epoch_tag.borrow_mut() = r.epoch_tag;
                Ok(Some(r.params))
            }
            304 => Ok(None), // unmodified (long polling timeout at the server)
            _ => {
                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
            },
        }
    }

    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {

        if rself.session.borrow().is_none() {
            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
        }
        let ses = rself.session.borrow().clone().unwrap();

        let mut req = rself.make_request();
        let buf = Arc::new(Mutex::new(Vec::new()));

        let dst = buf.clone();
        req.write_function(move |data| {
            let mut dst = dst.lock().unwrap();
            dst.extend_from_slice(data);
            if dst.len() > MAX_RESPONSE_SIZE {
                debug!("Too large params response from server: {}", dst.len());
                Ok(0)
            } else {
                Ok(data.len())
            }
        }).unwrap();

        let s = ses.perform(req)
            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
            .and_then(move |mut rq| {
                let result = buf.lock().unwrap();
                rself.handle_response(&mut rq, &result)
            });
        Box::new(s)

            /*
        let mut p = Params::defaults();
        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
        future::ok(Some(p)).boxed()
        */
    }

    fn new(config: &Config, epoch: String, handle: &Handle) -> Self {
        ParamWaiter {
            limitlog: NotTooOften::new(LOG_MINUTES*60),
            epoch_tag: RefCell::new(epoch),
            session: RefCell::new(None),
            config: config.clone(),
            handle: handle.clone(),
        }
    }

    pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
        let rcself = Rc::new(ParamWaiter::new(config, epoch, handle));

        let dur = Duration::from_millis(4000);
        let i = Interval::new(dur, &rcself.handle).unwrap()
            .map_err(|e| TemplogError::new_io("interval failed", e))
            .and_then(move |()| {
                Self::step(rcself.clone())
            })
            // throw away None params
            .filter_map(|p| p);
        Box::new(i)
    }
}