Mercurial > templog
diff rust/src/fridge.rs @ 632:bde302def78e rust
moving to riker, nowhere near yet
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Thu, 22 Aug 2019 23:59:50 +0800 |
parents | c57821a60e51 |
children | 490e9e15b98c |
line wrap: on
line diff
--- a/rust/src/fridge.rs Sat Jul 06 18:28:34 2019 +0800 +++ b/rust/src/fridge.rs Thu Aug 22 23:59:50 2019 +0800 @@ -1,29 +1,26 @@ -#[cfg(target_os = "linux")] -extern crate sysfs_gpio; +use std; -use std; -use std::io; -use std::mem; -use std::error::Error; use std::time::{Duration,Instant}; - -use futures::{Future,Stream}; -use futures::sync::{mpsc}; +use riker::actors::*; #[cfg(target_os = "linux")] use self::sysfs_gpio::{Direction, Pin}; -use config::Config; -use params::Params; -use types::*; +use super::config::Config; +use super::params::Params; +use super::types::*; #[derive(Debug)] -pub enum Message { - Sensor {wort: Option<f32>, fridge: Option<f32>}, - Params (Params), - Tick(u64), +pub struct Reading { + wort: Option<f32>, + fridge: Option<f32>, } +#[derive(Debug)] +pub struct Tick; + + +#[actor(Params, Tick, Reading)] pub struct Fridge { params: Params, config: Config, @@ -35,27 +32,60 @@ wort_valid_time: Instant, integrator: StepIntegrator, control: FridgeControl, +} - // Timeouts to wake ourselves up again - handle: Handle, - timeout_s: mpsc::Sender<u64>, - timeout_r: Option<mpsc::Receiver<u64>>, - ticker: u64, +impl Actor for Fridge { + type Msg = FridgeMsg; + fn recv(&mut self, + ctx: &Context<Self::Msg>, + msg: Self::Msg, + sender: Sender) { + self.receive(ctx, msg, sender); + // TODO: should we do self.tick(ctx) here instead? + } + + fn post_start(&mut self, ctx: &Context<Self::Msg>) { + self.tick(ctx); + } } -impl Sink for Fridge { +impl Receive<Reading> for Fridge { + type Msg = FridgeMsg; + fn receive(&mut self, + ctx: &Context<Self::Msg>, + r: Reading, + _sender: Sender) { + self.temp_wort = r.wort; + self.temp_fridge = r.fridge; - type SinkItem = Message; - type SinkError = TemplogError; + if self.temp_wort.is_some() { + self.wort_valid_time = Instant::now(); + } + + self.tick(ctx); + } +} - fn start_send(&mut self, msg: Message) - -> Result<(), Self::SinkError> { - self.process_msg(msg); - Ok() +impl Receive<Params> for Fridge { + type Msg = FridgeMsg; + fn receive(&mut self, + ctx: &Context<Self::Msg>, + p: Params, + _sender: Sender) { + self.params = p; + println!("fridge set_params {:?}", self.params); + + self.tick(ctx); } +} - fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { - Ok(futures::Async::Ready(())) +impl Receive<Tick> for Fridge { + type Msg = FridgeMsg; + fn receive(&mut self, + ctx: &Context<Self::Msg>, + _tick: Tick, + _sender: Sender) { + self.tick(ctx); } } @@ -65,8 +95,6 @@ Fake, } -struct TestControl {} - impl Drop for Fridge { fn drop(&mut self) { // safety fridge off @@ -75,8 +103,7 @@ } impl Fridge { - pub fn new(config: &Config, nowait: bool, p: Params, handle: &Handle) -> Fridge { - let (s, r) = mpsc::channel(1); + pub fn new(config: &Config, nowait: bool, p: Params) -> Fridge { let mut f = Fridge { config: config.clone(), params: p.clone(), @@ -87,11 +114,6 @@ wort_valid_time: Instant::now() - Duration::new(config.FRIDGE_WORT_INVALID_TIME, 100), integrator: StepIntegrator::new(Duration::new(p.overshoot_delay, 0)), control: Self::make_control(config), - - handle: handle.clone(), - timeout_s: s, - timeout_r: Some(r), - ticker: 0, }; if nowait { @@ -117,29 +139,12 @@ FridgeControl::Fake } - - fn next_wakeup(&self) -> Duration { let millis = 400; let dur = Duration::from_millis(millis); dur } - /// The fridge needs to periodically wake itself up, the returned - /// stream of Tick messages does so. - /// Examples of wakeups events are - /// - /// * overshoot calculation - /// * minimum fridge-off time - /// * invalid wort timeout - /// All specified in next_wakeup() - pub fn wakeups(&mut self) - -> Box<Stream<Item=Message, Error=TemplogError>> { - Box::new(mem::replace(&mut self.timeout_r, None) - .expect("Fridge::wakeups() can only be called once") - .map(|v| Message::Tick(v)) - .map_err(|e| TemplogError::new("wakeups() receive failed"))) - } fn turn_off(&mut self) { info!("Turning fridge off"); @@ -160,7 +165,7 @@ self.on = on; } - /// Turns the fridge off and on + // Turns the fridge off and on fn compare_temperatures(&mut self) { let fridge_min = self.params.fridge_setpoint - self.params.fridge_range_lower; let fridge_max = self.params.fridge_setpoint - self.params.fridge_range_upper; @@ -259,58 +264,20 @@ /// Must be called after every state change. Turns the fridge on/off as required and /// schedules any future wakeups based on the present (new) state - fn tick(&mut self) { + /// Examples of wakeups events are + /// + /// * overshoot calculation + /// * minimum fridge-off time + /// * invalid wort timeout + /// All specified in next_wakeup() + fn tick(&mut self, + ctx: &Context<Self::Msg>) { debug!("tick"); self.compare_temperatures(); - self.send_next_timeout(); - } - /// Sets the next self-wakeup timeout - fn send_next_timeout(&mut self) { - let waker = self.timeout_s.clone(); + // Sets the next self-wakeup timeout let dur = self.next_wakeup(); - debug!("fridge next wakeup {:?}", dur); - self.ticker += 1; - let v = self.ticker; - let t = Timeout::new(dur, &self.handle).unwrap() - .map_err(|_| ()) - .and_then(move |_| { - waker.send(v) - .map_err(|e| { - warn!("Send error in tick(): {}", e.to_string()); - () - }) - }) - .map(|_| ()); - self.handle.spawn(t); + ctx.schedule_once(dur, self, None, Tick); } - - fn process_msg(&mut self, msg: Message) { - debug!("process_msg {:?}", msg); - match msg { - Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge), - Message::Params(p) => self.update_params(p), - Message::Tick(v) => if v == self.ticker {self.tick()}, // schedule a timeout if there are none pending - }; - } - - pub fn update_params(&mut self, p: Params) { - self.params = p; - println!("fridge set_params {:?}", self.params); - - self.tick(); - } - - pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) { - self.temp_wort = wort; - self.temp_fridge = fridge; - - if self.temp_wort.is_some() { - self.wort_valid_time = Instant::now(); - } - - self.tick(); - } - }