view rust/src/fridge.rs @ 628:e1b5938de122 rust

futures sink? unsure what this was
author Matt Johnston <matt@ucc.asn.au>
date Fri, 19 Apr 2019 13:57:40 +0800
parents efcbe0d3afd6
children c57821a60e51
line wrap: on
line source

extern crate futures;
extern crate tokio_core;
#[cfg(target_os = "linux")]
extern crate sysfs_gpio;

use std;
use std::io;
use std::mem;
use std::error::Error;
use std::time::{Duration,Instant};

use futures::{Future,future,Sink,Stream};
use futures_sink::{Sink};
use tokio_core::reactor::{Timeout,Handle};
use futures::sync::{mpsc};

#[cfg(target_os = "linux")]
use self::sysfs_gpio::{Direction, Pin};

use config::Config;
use params::Params;
use types::*;

#[derive(Debug)]
pub enum Message {
    Sensor {wort: Option<f32>, fridge: Option<f32>},
    Params (Params),
    Tick(u64),
}

pub struct Fridge {
    params: Params,
    config: Config,

    on: bool,
    temp_wort: Option<f32>,
    temp_fridge: Option<f32>,
    last_off_time: Instant,
    wort_valid_time: Instant,
    integrator: StepIntegrator,
    control: FridgeControl,

    // Timeouts to wake ourselves up again
    handle: Handle,
    timeout_s: mpsc::Sender<u64>,
    timeout_r: Option<mpsc::Receiver<u64>>,
    ticker: u64,
}

impl Sink for Fridge {

    type SinkItem = Message;
    type SinkError = TemplogError;

    fn start_send(&mut self, msg: Message)
            -> Result<(), Self::SinkError> {
        self.process_msg(msg);
        Ok()
    }

    fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
        Ok(futures::Async::Ready(()))
    }
}

enum FridgeControl {
#[cfg(target_os = "linux")]
    Gpio(Pin),
    Fake,
}

struct TestControl {}

impl Drop for Fridge {
    fn drop(&mut self) {
        // safety fridge off 
        self.turn(false);
    }
}

impl Fridge {
    pub fn new(config: &Config, nowait: bool, p: Params, handle: &Handle) -> Fridge {
        let (s, r) = mpsc::channel(1);
        let mut f = Fridge { 
            config: config.clone(),
            params: p.clone(),
            on: false,
            temp_wort: None,
            temp_fridge: None,
            last_off_time: Instant::now(),
            wort_valid_time: Instant::now() - Duration::new(config.FRIDGE_WORT_INVALID_TIME, 100),
            integrator: StepIntegrator::new(Duration::new(p.overshoot_delay, 0)),
            control: Self::make_control(config),

            handle: handle.clone(),
            timeout_s: s,
            timeout_r: Some(r),
            ticker: 0,
        };

        if nowait {
            f.last_off_time -= Duration::new(config.FRIDGE_DELAY, 1);
        }

        f.tick();

        f
    }

#[cfg(target_os = "linux")]
    fn make_control(config: &Config) -> FridgeControl {
        let mut pin = Pin(config.FRIDGE_GPIO_PIN);
        // XXX better error handling?
        pin.export().expect("Exporting fridge gpio failed");
        pin.set_direction(Direction::Low).expect("Fridge gpio direction failed");
        FridgeControl::Gpio(pin)
    }

#[cfg(not(target_os = "linux"))]
    fn make_control(config: &Config) -> FridgeControl {
            FridgeControl::Fake
    }



    fn next_wakeup(&self) -> Duration {
        let millis = 400;
        let dur = Duration::from_millis(millis);
        dur
    }

    /// The fridge needs to periodically wake itself up, the returned
    /// stream of Tick messages does so.
    /// Examples of wakeups events are
    /// 
    ///  * overshoot calculation
    ///  * minimum fridge-off time
    ///  * invalid wort timeout
    /// All specified in next_wakeup()
    pub fn wakeups(&mut self)
            -> Box<Stream<Item=Message, Error=TemplogError>> {
        Box::new(mem::replace(&mut self.timeout_r, None)
            .expect("Fridge::wakeups() can only be called once")
            .map(|v| Message::Tick(v))
            .map_err(|e| TemplogError::new("wakeups() receive failed")))
    }

    fn turn_off(&mut self) {
        info!("Turning fridge off");
        self.turn(false);
    }

    fn turn_on(&mut self) {
        info!("Turning fridge on");
        self.turn(true);
    }

    fn turn(&mut self, on: bool) {
        match self.control {
#[cfg(target_os = "linux")]
            Gpio(pin) => pin.set_value(on as u8),
            FridgeControl::Fake => debug!("fridge turns {}", if on {"on"} else {"off"}),
        }
        self.on = on;
    }

    /// Turns the fridge off and on
    fn compare_temperatures(&mut self) {
        let fridge_min = self.params.fridge_setpoint - self.params.fridge_range_lower;
        let fridge_max = self.params.fridge_setpoint - self.params.fridge_range_upper;
        let wort_max = self.params.fridge_setpoint + self.params.fridge_difference;
        let off_time = Instant::now() - self.last_off_time;

        // Or elsewhere?
        self.integrator.set_limit(Duration::new(self.params.overshoot_delay, 0));

        // Safety to avoid bad things happening to the fridge motor (?)
        // When it turns off don't start up again for at least FRIDGE_DELAY
        if !self.on && off_time < Duration::new(self.config.FRIDGE_DELAY, 0) {
            info!("fridge skipping, too early");
            return;
        }

        if self.params.disabled {
            if self.on {
                info!("Disabled, turning fridge off");
                self.turn_off();
            }
            return;
        }

        // handle broken wort sensor
        if self.temp_wort.is_none() {
            let invalid_time = Instant::now() - self.wort_valid_time;
            warn!("Invalid wort sensor for {:?} secs", invalid_time);
            if invalid_time < Duration::new(self.config.FRIDGE_WORT_INVALID_TIME, 0) {
                warn!("Has only been invalid for {:?}, waiting", invalid_time);
                return;
            }
        }

        if self.temp_fridge.is_none() {
            warn!("Invalid fridge sensor");
        }

        if self.on {
            debug!("fridge is on");
            let on_time = self.integrator.integrate().as_secs() as f32;
            let on_ratio = on_time / self.params.overshoot_delay as f32;

            let overshoot = self.params.overshoot_factor as f32 * on_ratio;
            debug!("on_percent {}, overshoot {}", on_ratio * 100.0, overshoot);

            let mut turn_off = false;
            if self.temp_wort.is_some() && !self.params.nowort {
                let t = self.temp_wort.unwrap();
                // use the wort temperature
                if t - overshoot < self.params.fridge_setpoint {
                    info!("wort has cooled enough, {temp}º (overshoot {overshoot}º = {factor} × {percent}%)",
                         temp = t, overshoot = overshoot,
                         factor = self.params.overshoot_factor,
                         percent = on_ratio*100.0);
                    turn_off = true;
                }
            } else if let Some(t) = self.temp_fridge {
                // use the fridge temperature
                if t < fridge_min {
                    warn!("fridge off fallback, fridge {}, min {}", t, fridge_min);
                    if self.temp_wort.is_none() {
                        warn!("wort has been invalid for {:?}", Instant::now() - self.wort_valid_time);
                    }
                    turn_off = true;
                }
            }
            if turn_off {
                self.turn_off();
            }
        } else {
            debug!("fridge is off. fridge {:?} max {:?}. wort {:?} max {:?}",
                self.temp_fridge, fridge_max, self.temp_wort, wort_max);
            let mut turn_on = false;
            if self.temp_wort.is_some() && !self.params.nowort {
                // use the wort temperature
                let t = self.temp_wort.unwrap();
                if t >= wort_max {
                    info!("Wort is too hot {}°, max {}°", t, wort_max);
                    turn_on = true;
                }
            } 

            if let Some(t) = self.temp_fridge {
                if t >= fridge_max {
                    warn!("fridge too hot fallback, fridge {}°, max {}°", t, fridge_max);
                    turn_on = true;
                }
            }

            if turn_on {
                self.turn_on()
            }
        }
    }

    /// Must be called after every state change. Turns the fridge on/off as required and
    /// schedules any future wakeups based on the present (new) state
    fn tick(&mut self) {
        debug!("tick");

        self.compare_temperatures();
        self.send_next_timeout();
    }

    /// Sets the next self-wakeup timeout
    fn send_next_timeout(&mut self) {
        let waker = self.timeout_s.clone();
        let dur = self.next_wakeup();
        debug!("fridge next wakeup {:?}", dur);
        self.ticker += 1;
        let v = self.ticker;
        let t = Timeout::new(dur, &self.handle).unwrap()
            .map_err(|_| ())
            .and_then(move |_| {
                waker.send(v)
                    .map_err(|e| {
                        warn!("Send error in tick(): {}", e.to_string());
                        ()
                    })
            })
            .map(|_| ());
        self.handle.spawn(t);
    }

    fn process_msg(&mut self, msg: Message) {
        debug!("process_msg {:?}", msg);
        match msg {
            Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge),
            Message::Params(p) => self.update_params(p),
            Message::Tick(v) => if v == self.ticker {self.tick()}, // schedule a timeout if there are none pending
        };
    }

    pub fn update_params(&mut self, p: Params) {
        self.params = p;
        println!("fridge set_params {:?}", self.params);

        self.tick();
    }

    pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) {
        self.temp_wort = wort;
        self.temp_fridge = fridge;

    	if self.temp_wort.is_some() {
            self.wort_valid_time = Instant::now();
        }

        self.tick();
    }

}