view rust/src/types.rs @ 611:f3e39e2107fd rust

still doesn't compile, improvements to TemplogError and tokio curl though
author Matt Johnston <matt@ucc.asn.au>
date Tue, 28 Feb 2017 22:58:47 +0800
parents af0dac00d40b
children 5fc41e0833b4
line wrap: on
line source

use std::collections::HashMap;
use std::time::{Duration,Instant};
use std::error::Error;
use std::fmt;
use std::io;
use std::cmp;
use std::cell::Cell;
use std::iter::FromIterator;
use std;

use futures::{Stream,IntoFuture};
use serde::{Deserialize,Serialize};
use toml;
use curl;
use serde_json;

#[derive(Deserialize, Serialize, Debug)]
pub struct Params {
    pub fridge_setpoint: f32,
    pub fridge_difference: f32,
    pub overshoot_delay: u32,
    pub overshoot_factor: f32,
    pub disabled: bool,
    pub nowort: bool,
    pub fridge_range_lower: f32,
    pub fridge_range_upper: f32,
}

impl Params {
    pub fn defaults() -> Params {
        Params {
            fridge_setpoint: 16.0,
            fridge_difference: 0.2,
            overshoot_delay: 720, // 12 minutes
            overshoot_factor: 1.0,
            disabled: false,
            nowort: false,
            fridge_range_lower: 3.0,
            fridge_range_upper: 3.0,
            }
    }
}

#[derive(Debug)]
pub struct ParamHolder {
    pub p: Params,
    epoch: String,
}

impl ParamHolder {
    pub fn new() -> ParamHolder {
        ParamHolder {
            p: Params::defaults(),
            epoch: String::new(),
        }
    }

    pub fn receive(&mut self, p: &Params, epoch: &String)
    {

    }
}

#[derive(Debug)]
pub struct Readings {
    pub temps: HashMap<String, f32>,
}

impl Readings {
    pub fn new() -> Readings {
        Readings {
            temps: HashMap::new(),
        }
    }

    pub fn add(&mut self, name: &str, v: f32) {
        self.temps.insert(name.to_string(), v);
    }

    pub fn get_temp(&self, name: &str) -> Option<f32> {
        self.temps.get(name).map(|f| *f)
    }
}

#[derive(Debug)]
pub enum TemplogErrorKind {
    None,
    Io(io::Error),
    ParseFloat(std::num::ParseFloatError),
    TomlDe(toml::de::Error),
    SerdeJson(serde_json::Error),
    Curl(curl::Error),
}

#[derive(Debug)]
pub struct TemplogError {
    desc: String,
    kind: TemplogErrorKind,
}

impl Error for TemplogError {
    fn description(&self) -> &str { 
        &format!("{}", self)
    }

    fn cause(&self) -> Option<&Error> { 
        match self.kind {
            TemplogErrorKind::None => None,
            TemplogErrorKind::Io(e) => Some(&e),
            TemplogErrorKind::ParseFloat(e) => Some(&e),
            TemplogErrorKind::TomlDe(e) => Some(&e),
            TemplogErrorKind::SerdeJson(e) => Some(&e),
            TemplogErrorKind::Curl(e) => Some(&e),
        }
    }
}

impl fmt::Display for TemplogError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self.kind {
            TemplogErrorKind::None => write!(f, "Templog Error: {}", self.desc),
            TemplogErrorKind::Io(e) => write!(f, "Templog IO error {}: {}", self.desc, e),
            TemplogErrorKind::TomlDe(e) => write!(f, "Templog toml error {}: {}", self.desc, e),
            TemplogErrorKind::SerdeJson(e) => write!(f, "Json decode error {}: {}", self.desc, e),
            TemplogErrorKind::ParseFloat(e) => write!(f, "Templog parse error {}: {}", self.desc, e),
            TemplogErrorKind::Curl(e) => write!(f, "Templog curl http error {}: {}", self.desc, e),
        };
        Ok(())
    }
}

impl TemplogError {
    pub fn new(desc: &str) -> Self {
        TemplogError { 
            desc: desc.to_string(),
            kind: TemplogErrorKind::None,
        }
    }

    pub fn new_io(desc: &str, e: io::Error) -> Self {
        TemplogError { 
            desc: desc.to_string(),
            kind: TemplogErrorKind::Io(e),
        }
    }

    pub fn new_toml_de(desc: &str, e: toml::de::Error) -> Self {
        TemplogError { 
            desc: desc.to_string(),
            kind: TemplogErrorKind::TomlDe(e),
        }
    }

    pub fn new_parse_float(desc: &str, e: std::num::ParseFloatError) -> Self {
        TemplogError { 
            desc: desc.to_string(),
            kind: TemplogErrorKind::ParseFloat(e),
        }
    }

    pub fn new_curl(desc: &str, e: curl::Error) -> Self {
        TemplogError { 
            desc: desc.to_string(),
            kind: TemplogErrorKind::Curl(e),
        }
    }

    pub fn new_serde_json(desc: &str, e: serde_json::Error) -> Self {
        TemplogError { 
            desc: desc.to_string(),
            kind: TemplogErrorKind::SerdeJson(e),
        }
    }
}

impl From<io::Error> for TemplogError {
    fn from(e: io::Error) -> Self {
        TemplogError::new_io("", e)
    }
}

impl From<toml::de::Error> for TemplogError {
    fn from(e: toml::de::Error) -> Self {
        TemplogError::new_toml_de("", e)
    }
}

impl From<std::num::ParseFloatError> for TemplogError {
    fn from(e: std::num::ParseFloatError) -> Self {
        TemplogError::new_parse_float("", e)
    }
}

impl From<curl::Error> for TemplogError {
    fn from(e: curl::Error) -> Self {
        TemplogError::new_curl("", e)
    }
}

impl From<serde_json::Error> for TemplogError {
    fn from(e: serde_json::Error) -> Self {
        TemplogError::new_serde_json("", e)
    }
}

/// Call closures with a rate limit. Useful for log message ratelimiting
#[derive(Clone)]
pub struct NotTooOften {
    last: Cell<Instant>,
    limit: Duration,
}

impl NotTooOften {
    pub fn new(limit_secs: u64) -> Self {
        NotTooOften {
            limit: Duration::new(limit_secs, 0),
            last: Cell::new(Instant::now() - Duration::new(limit_secs+1, 0)),
        }
    }

    pub fn and_then<F>(&self, op: F)
        where F: Fn() {
        let now = Instant::now();
        if now - self.last.get() > self.limit {
            self.last.set(now);
            op();
        }
    }
}

struct Period {
    start: Instant,
    end: Option<Instant>,
}

pub struct StepIntegrator {
    on_periods: Vec<Period>,
    limit: Duration,
}

impl StepIntegrator {
    pub fn new(limit: Duration) -> Self {
        StepIntegrator {
            on_periods: Vec::new(),
            limit: limit,
        }
    }

    pub fn turn(&mut self, on: bool) {
        self.trim();

        if self.on_periods.is_empty() {
            self.on_periods.push( Period { start: Instant::now(), end: None });
            return;
        }

        let current_on = self.on_periods.last().unwrap().end.is_none();
        if on == current_on {
            // state is unchanged
            return;
        }

        if on {
            self.on_periods.push( Period { start: Instant::now(), end: None });
        } else {
            self.on_periods.last_mut().unwrap().end = Some(Instant::now());
        }
    }

    pub fn set_limit(&mut self, limit: Duration) {
        self.limit = limit;
        self.trim();
    }

    fn integrate(&self) -> Duration {
        let durs = self.on_periods.iter().map(|p| {
            let end = p.end.unwrap_or_else(|| Instant::now());
            end - p.start
        });
        // TODO rust 1.16: impl Sum for Duration 
        // durs.sum()
        durs.fold(Duration::new(0,0), |acc, d| acc + d)
    }

    fn trim(&mut self) {
        let begin = Instant::now() - self.limit;

        self.on_periods.retain(|p| {
            // remove expired endtimes
            if let Some(e) = p.end {
                e >= begin && e != p.start
            } else {
                true
            }
        });

        for p in &mut self.on_periods {
            p.start = cmp::max(p.start, begin);
        }
    }
}

/// Takes a stream and returns a stream without errors.
pub fn consume_errors<S>(s: S) -> Box<Stream<Item=S::Item, Error=S::Error>> 
// XXX not sure why 'static is really necessary here?
    where 
    S: Stream+Send+'static, 
    <S as Stream>::Error: std::fmt::Display+Send+'static,
    <S as Stream>::Item: Send+'static,
    {
    s.then(|r| {
        match r {
            Ok(v) => Ok(Some(v)),
            Err(e) => {
                debug!("Stream error: {}", e);
                Ok(None)
            }
        }
    })
    .filter_map(|p| p).boxed()
}