Mercurial > templog
view rust/src/fridge.rs @ 620:8fda564cc46f rust
fridge work
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Thu, 23 Mar 2017 00:20:22 +0800 |
parents | f153aec221be |
children | 8136a6b99866 |
line wrap: on
line source
extern crate futures; extern crate tokio_core; extern crate sysfs_gpio; use std; use std::io; use std::mem; use std::error::Error; use std::time::{Duration,Instant}; use futures::{Future,future,Sink,Stream}; use tokio_core::reactor::{Timeout,Handle}; use futures::sync::{mpsc}; use sysfs_gpio::{Direction, Pin}; use config::Config; use params::Params; use types::*; #[derive(Debug)] pub enum Message { Sensor {wort: Option<f32>, fridge: Option<f32>}, Params (Params), Tick(u64), } pub struct Fridge { params: Params, config: Config, on: bool, temp_wort: Option<f32>, temp_fridge: Option<f32>, last_off_time: Instant, wort_valid_time: Instant, integrator: StepIntegrator, gpio: Pin, // Timeouts to wake ourselves up again handle: Handle, timeout_s: mpsc::Sender<u64>, timeout_r: Option<mpsc::Receiver<u64>>, ticker: u64, } impl Sink for Fridge { type SinkItem = Message; type SinkError = TemplogError; fn start_send(&mut self, msg: Message) -> futures::StartSend<Self::SinkItem, Self::SinkError> { self.process_msg(msg); Ok(futures::AsyncSink::Ready) } fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { Ok(futures::Async::Ready(())) } } impl Drop for Fridge { fn drop(&mut self) { // safety fridge off self.gpio.set_state(0); } } impl Fridge { pub fn new(config: &Config, nowait: bool, p: Params, handle: &Handle) -> Fridge { let (s, r) = mpsc::channel(1); let mut f = Fridge { config: config.clone(), params: p.clone(), on: false, temp_wort: None, temp_fridge: None, last_off_time: Instant::now(), wort_valid_time: Instant::now() - Duration::new(config.FRIDGE_WORT_INVALID_TIME, 100), integrator: StepIntegrator::new(p.overshoot_delay), gpio: Pin::new(config.FRIDGE_GPIO_PIN), handle: handle.clone(), timeout_s: s, timeout_r: Some(r), ticker: 0, }; if nowait { f.last_off_time -= Duration::new(config.FRIDGE_DELAY, 1); } // XXX better error handling? f.gpio.export().expect("Exporting fridge gpio failed"); f.gpio.set_direction(Direction::Low).expect("Fridge gpio direction failed"); f.tick(); f } fn next_wakeup(&self) -> Duration { let millis = 400; let dur = Duration::from_millis(millis); dur } /// The fridge needs to periodically wake itself up, the returned /// stream of Tick messages does so. /// Examples of wakeups events are /// /// * overshoot calculation /// * minimum fridge-off time /// * invalid wort timeout /// All specified in next_wakeup() pub fn wakeups(&mut self) -> Box<Stream<Item=Message, Error=TemplogError>> { mem::replace(&mut self.timeout_r, None) .expect("Fridge::wakeups() can only be called once") .map(|v| Message::Tick(v)) .map_err(|e| TemplogError::new("wakeups() receive failed")) .boxed() } fn turn_off(&mut self) { log!("Turning fridge off"); self.turn(false); } fn turn_on(&mut self) { log!("Turning fridge on"); self.turn(true); } fn turn(&mut self, on: bool) { self.on = on; self.gpio.set_value(on as u8) } /// Turns the fridge off and on fn compare_temperatures(&mut self) { let fridge_min = self.params.fridge_setpoint - self.params.fridge_range_lower; let fridge_max = self.params.fridge_setpoint - self.params.fridge_range_upper; let wort_max = self.params.fridge_setpoint + self.params.fridge_difference; let off_time = Instant::now() - self.last_off_time; // Or elsewhere? self.integrator.set_limit(self.params.overshoot_delay); // Safety to avoid bad things happening to the fridge motor (?) // When it turns off don't start up again for at least FRIDGE_DELAY if !self.on && off_time < self.config.FRIDGE_DELAY { log!("fridge skipping, too early"); return; } if self.params.disabled { if self.on { log!("Disabled, turning fridge off"); self.turn_off(); } return; } // handle broken wort sensor if self.temp_wort.is_none() { let invalid_time = Instant::now() - self.wort_valid_time; warn!("Invalid wort sensor for {:?} secs", invalid_time); if invalid_time < self.config.FRIDGE_WORT_INVALID_TIME { warn!("Has only been invalid for {:?}, waiting", invalid_time); return; } } if self.temp_fridge.is_none() { warn!("Invalid fridge sensor"); } if self.on { debug!("fridge is on"); let on_time = self.integrator.integrate().as_secs() as f32; let on_ratio = on_time / params.overshoot_delay as f32; overshoot = params.overshoot_factor as f32 * on_ratio; debug!("on_time {}, overshoot {}", on_percent, overshoot); let mut turn_off = false; if let Some(t) = self.temp_wort && !params.nowort { // use the wort temperature if t - overshoot < params.fridge_setpoint { log!("wort has cooled enough, {temp}º (overshoot {overshoot}º = {factor} × {percent}%)", temp = t, overshoot = overshoot, factor = params.overshoot_factor, percent = on_ratio*100); turn_off = true; } } else let if Some(t) = self.temp_fridge { // use the fridge temperature if t < fridge_min { warn!("fridge off fallback, fridge {}, min {}", t, fridge_min); if self.temp_wort.is_none() { W("wort has been invalid for {:?}", Instant::now() - self.wort_valid_time); } turn_off = true; } } if turn_off { self.turn_off(); } } else { debug!("fridge is off. fridge {:?} max {:?}. wort {:?} max {:?}", self.temp_fridge, fridge_max, self.temp_wort, wort_max); let mut turn_on = false; if let Some(t) = self.temp_wort && !params.nowort { // use the wort temperature if t >= wort_max { log!("Wort is too hot {}°, max {}°", t, wort_max); turn_on = true; } } if let Some(t) = self.temp_fridge { if t >= fridge_max { warn!("fridge too hot fallback, fridge {}°, max {}°", t, fridge_max) turn_on = true; } } if turn_on { self.turn_on() } } } /// Must be called after every state change. Turns the fridge on/off as required and /// schedules any future wakeups based on the present (new) state fn tick(&mut self) { debug!("tick"); self.compare_temperatures() self.send_next_timeout(); } /// Sets the next self-wakeup timeout fn send_next_timeout(&mut self) { let waker = self.timeout_s.clone(); let dur = self.next_wakeup(); debug!("fridge next wakeup {:?}", dur); self.ticker += 1; let v = self.ticker; let t = Timeout::new(dur, &self.handle).unwrap() .map_err(|_| ()) .and_then(move |_| { waker.send(v) .map_err(|e| { warn!("Send error in tick(): {}", e.to_string()); () }) }) .map(|_| ()); self.handle.spawn(t); } fn process_msg(&mut self, msg: Message) { debug!("process_msg {:?}", msg); match msg { Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge), Message::Params(p) => self.update_params(p), Message::Tick(v) => if v == self.ticker {self.tick()}, // schedule a timeout if there are none pending }; } pub fn update_params(&mut self, p: Params) { self.params = p; println!("fridge set_params {:?}", self.params); self.tick(); } pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) { self.temp_wort = wort; self.temp_fridge = fridge; if self.temp_wort.is_some() { self.wort_valid_time = Instant::now(); } self.tick(); } }