view rust/src/params.rs @ 618:2d65a9f0bed3 rust

params returning stream of success
author Matt Johnston <matt@ucc.asn.au>
date Tue, 21 Mar 2017 22:35:58 +0800
parents a85c0c9bc1fa
children 8136a6b99866
line wrap: on
line source

extern crate tokio_core;
extern crate futures;
extern crate rand;
extern crate serde_json;
extern crate base64;

use std::time::Duration;
use std::io;
use std::str;
use std::rc::Rc;
use std::sync::{Arc,Mutex};
use std::error::Error;
use std::cell::{Cell,RefCell};

use tokio_core::reactor::Interval;
use tokio_core::reactor::Handle;
use tokio_curl::Session;
use futures::{Stream,Future,future};
use curl::easy::Easy;
use curl::easy;
use self::rand::Rng;

use types::*;
use ::Config;

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Params {
    pub fridge_setpoint: f32,
    pub fridge_difference: f32,
    pub overshoot_delay: u32,
    pub overshoot_factor: f32,
    pub disabled: bool,
    pub nowort: bool,
    pub fridge_range_lower: f32,
    pub fridge_range_upper: f32,
}

impl Params {
    pub fn defaults() -> Params {
        Params {
            fridge_setpoint: 16.0,
            fridge_difference: 0.2,
            overshoot_delay: 720, // 12 minutes
            overshoot_factor: 1.0,
            disabled: false,
            nowort: false,
            fridge_range_lower: 3.0,
            fridge_range_upper: 3.0,
            }
    }

    pub fn load(config: &Config) -> Params {
        // generate random epoch on success.
        // TODO: return failure? or just return default() ?
        let mut p = Params::defaults();

        unimplemented!();
    }
}

#[derive(Deserialize, Debug)]
struct Response {
    // sent as an opaque etag: header. Has format "epoch-nonce",
    // responses where the epoch do not match ParamWaiter::epoch are dropped
    etag: String,
    params: Params,
}

#[derive(Clone)]
pub struct ParamWaiter {
    limitlog: NotTooOften,
    // last_etag is used for long-polling.
    last_etag: RefCell<String>,
    session: RefCell<Option<Session>>,
    epoch: String,

    config: Config,
    handle: Handle,
}

const LOG_MINUTES: u64 = 15;
const MAX_RESPONSE_SIZE: usize = 10000;
const TIMEOUT_MINUTES: u64 = 5;

impl ParamWaiter {
    fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) {
        let mut req = Easy::new();
        req.get(true).unwrap();
        // supposedly req.url won't fail, checking happens later?
        req.url(&self.config.SETTINGS_URL).unwrap();

        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();

        // store response
        let max_response_size = 10000;
        let buf = Arc::new(Mutex::new(Vec::new()));
        let dst = buf.clone();
        req.write_function(move |data| {
            let mut dst = dst.lock().unwrap();
            dst.extend_from_slice(data);
            if dst.len() > max_response_size {
                error!("Too large params response from server: {}", dst.len());
                Ok(0)
            } else {
                Ok(data.len())
            }
        }).unwrap();

        // http header
        let e = self.last_etag.borrow();
        if !e.is_empty() {
            let mut list = easy::List::new();
            let hd = format!("etag: {}", *e);
            list.append(&hd).unwrap();
            req.http_headers(list).unwrap();
        }

        (req, buf)
    }

    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Params, TemplogError> {
        let text = String::from_utf8_lossy(buf).to_string();
        let resp = req.response_code()?;
        match resp {
            200 => {
                // new params
                let r: Response = serde_json::from_str(&text)?;
                let mut le = self.last_etag.borrow_mut();
                *le = r.etag;
                if le.split('-').next().unwrap() == &self.epoch {
                    Ok(r.params)
                } else {
                    Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch)))
                }
            }
            304 => {
                // XXX this isn't really an error
                Err(TemplogError::new("304 unmodified (long polling timeout at the server)"))
            },
            _ => {
                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
            },
        }
    }

    fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> {

        if rself.session.borrow().is_none() {
            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
        }

        let (mut req, buf) = rself.make_request();

        let ses = rself.session.borrow().clone().unwrap();
        let s = ses.perform(req)
            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
            .and_then(move |mut rq| {
                let result = buf.lock().unwrap();
                rself.handle_response(&mut rq, &result)
            });
        Box::new(s)
    }

    fn new(config: &Config, handle: &Handle) -> Self {
        let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
        rand::OsRng::new().unwrap().fill_bytes(&mut b);
        let epoch = base64::encode(&b);
        ParamWaiter {
            limitlog: NotTooOften::new(LOG_MINUTES*60),
            last_etag: RefCell::new(String::new()),
            session: RefCell::new(None),
            epoch: epoch,
            config: config.clone(),
            handle: handle.clone(),
        }
    }

    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
        let rcself = Rc::new(ParamWaiter::new(config, handle));

        let dur = Duration::from_millis(4000);
        let i = Interval::new(dur, &rcself.handle).unwrap()
            .map_err(|e| TemplogError::new_io("interval failed", e))
            .and_then(move |()| {
                Self::step(rcself.clone())
            });

        // TODO use consume_errors() instead once "impl trait" is stable
        //Box::new(consume_errors(i))
        let f = i.then(|r| {
            match r {
                Ok(v) => Ok(Some(v)),
                Err(e) => {
                    debug!("Params stream error: {}", e);
                    Ok(None)
                }
            }
        })
        .filter_map(|p| p);
        Box::new(f)
    }
}