diff rust/src/fridge.rs @ 632:bde302def78e rust

moving to riker, nowhere near yet
author Matt Johnston <matt@ucc.asn.au>
date Thu, 22 Aug 2019 23:59:50 +0800
parents c57821a60e51
children 490e9e15b98c
line wrap: on
line diff
--- a/rust/src/fridge.rs	Sat Jul 06 18:28:34 2019 +0800
+++ b/rust/src/fridge.rs	Thu Aug 22 23:59:50 2019 +0800
@@ -1,29 +1,26 @@
-#[cfg(target_os = "linux")]
-extern crate sysfs_gpio;
+use std;
 
-use std;
-use std::io;
-use std::mem;
-use std::error::Error;
 use std::time::{Duration,Instant};
-
-use futures::{Future,Stream};
-use futures::sync::{mpsc};
+use riker::actors::*;
 
 #[cfg(target_os = "linux")]
 use self::sysfs_gpio::{Direction, Pin};
 
-use config::Config;
-use params::Params;
-use types::*;
+use super::config::Config;
+use super::params::Params;
+use super::types::*;
 
 #[derive(Debug)]
-pub enum Message {
-    Sensor {wort: Option<f32>, fridge: Option<f32>},
-    Params (Params),
-    Tick(u64),
+pub struct Reading {
+    wort: Option<f32>, 
+    fridge: Option<f32>,
 }
 
+#[derive(Debug)]
+pub struct Tick;
+
+
+#[actor(Params, Tick, Reading)]
 pub struct Fridge {
     params: Params,
     config: Config,
@@ -35,27 +32,60 @@
     wort_valid_time: Instant,
     integrator: StepIntegrator,
     control: FridgeControl,
+}
 
-    // Timeouts to wake ourselves up again
-    handle: Handle,
-    timeout_s: mpsc::Sender<u64>,
-    timeout_r: Option<mpsc::Receiver<u64>>,
-    ticker: u64,
+impl Actor for Fridge {
+    type Msg = FridgeMsg;
+    fn recv(&mut self,
+                ctx: &Context<Self::Msg>,
+                msg: Self::Msg,
+                sender: Sender) {
+        self.receive(ctx, msg, sender);
+        // TODO: should we do self.tick(ctx) here instead?
+    }
+
+    fn post_start(&mut self, ctx: &Context<Self::Msg>) {
+        self.tick(ctx);
+    }
 }
 
-impl Sink for Fridge {
+impl Receive<Reading> for Fridge {
+    type Msg = FridgeMsg;
+    fn receive(&mut self,
+                ctx: &Context<Self::Msg>,
+                r: Reading,
+                _sender: Sender) {
+        self.temp_wort = r.wort;
+        self.temp_fridge = r.fridge;
 
-    type SinkItem = Message;
-    type SinkError = TemplogError;
+        if self.temp_wort.is_some() {
+            self.wort_valid_time = Instant::now();
+        }
+
+        self.tick(ctx);
+    }
+}
 
-    fn start_send(&mut self, msg: Message)
-            -> Result<(), Self::SinkError> {
-        self.process_msg(msg);
-        Ok()
+impl Receive<Params> for Fridge {
+    type Msg = FridgeMsg;
+    fn receive(&mut self,
+                ctx: &Context<Self::Msg>,
+                p: Params,
+                _sender: Sender) {
+        self.params = p;
+        println!("fridge set_params {:?}", self.params);
+
+        self.tick(ctx);
     }
+}
 
-    fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
-        Ok(futures::Async::Ready(()))
+impl Receive<Tick> for Fridge {
+    type Msg = FridgeMsg;
+    fn receive(&mut self,
+                ctx: &Context<Self::Msg>,
+                _tick: Tick,
+                _sender: Sender) {
+        self.tick(ctx);
     }
 }
 
@@ -65,8 +95,6 @@
     Fake,
 }
 
-struct TestControl {}
-
 impl Drop for Fridge {
     fn drop(&mut self) {
         // safety fridge off 
@@ -75,8 +103,7 @@
 }
 
 impl Fridge {
-    pub fn new(config: &Config, nowait: bool, p: Params, handle: &Handle) -> Fridge {
-        let (s, r) = mpsc::channel(1);
+    pub fn new(config: &Config, nowait: bool, p: Params) -> Fridge {
         let mut f = Fridge { 
             config: config.clone(),
             params: p.clone(),
@@ -87,11 +114,6 @@
             wort_valid_time: Instant::now() - Duration::new(config.FRIDGE_WORT_INVALID_TIME, 100),
             integrator: StepIntegrator::new(Duration::new(p.overshoot_delay, 0)),
             control: Self::make_control(config),
-
-            handle: handle.clone(),
-            timeout_s: s,
-            timeout_r: Some(r),
-            ticker: 0,
         };
 
         if nowait {
@@ -117,29 +139,12 @@
             FridgeControl::Fake
     }
 
-
-
     fn next_wakeup(&self) -> Duration {
         let millis = 400;
         let dur = Duration::from_millis(millis);
         dur
     }
 
-    /// The fridge needs to periodically wake itself up, the returned
-    /// stream of Tick messages does so.
-    /// Examples of wakeups events are
-    /// 
-    ///  * overshoot calculation
-    ///  * minimum fridge-off time
-    ///  * invalid wort timeout
-    /// All specified in next_wakeup()
-    pub fn wakeups(&mut self)
-            -> Box<Stream<Item=Message, Error=TemplogError>> {
-        Box::new(mem::replace(&mut self.timeout_r, None)
-            .expect("Fridge::wakeups() can only be called once")
-            .map(|v| Message::Tick(v))
-            .map_err(|e| TemplogError::new("wakeups() receive failed")))
-    }
 
     fn turn_off(&mut self) {
         info!("Turning fridge off");
@@ -160,7 +165,7 @@
         self.on = on;
     }
 
-    /// Turns the fridge off and on
+    // Turns the fridge off and on
     fn compare_temperatures(&mut self) {
         let fridge_min = self.params.fridge_setpoint - self.params.fridge_range_lower;
         let fridge_max = self.params.fridge_setpoint - self.params.fridge_range_upper;
@@ -259,58 +264,20 @@
 
     /// Must be called after every state change. Turns the fridge on/off as required and
     /// schedules any future wakeups based on the present (new) state
-    fn tick(&mut self) {
+    /// Examples of wakeups events are
+    /// 
+    ///  * overshoot calculation
+    ///  * minimum fridge-off time
+    ///  * invalid wort timeout
+    /// All specified in next_wakeup()
+    fn tick(&mut self,
+        ctx: &Context<Self::Msg>) {
         debug!("tick");
 
         self.compare_temperatures();
-        self.send_next_timeout();
-    }
 
-    /// Sets the next self-wakeup timeout
-    fn send_next_timeout(&mut self) {
-        let waker = self.timeout_s.clone();
+        // Sets the next self-wakeup timeout
         let dur = self.next_wakeup();
-        debug!("fridge next wakeup {:?}", dur);
-        self.ticker += 1;
-        let v = self.ticker;
-        let t = Timeout::new(dur, &self.handle).unwrap()
-            .map_err(|_| ())
-            .and_then(move |_| {
-                waker.send(v)
-                    .map_err(|e| {
-                        warn!("Send error in tick(): {}", e.to_string());
-                        ()
-                    })
-            })
-            .map(|_| ());
-        self.handle.spawn(t);
+        ctx.schedule_once(dur, self, None, Tick);
     }
-
-    fn process_msg(&mut self, msg: Message) {
-        debug!("process_msg {:?}", msg);
-        match msg {
-            Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge),
-            Message::Params(p) => self.update_params(p),
-            Message::Tick(v) => if v == self.ticker {self.tick()}, // schedule a timeout if there are none pending
-        };
-    }
-
-    pub fn update_params(&mut self, p: Params) {
-        self.params = p;
-        println!("fridge set_params {:?}", self.params);
-
-        self.tick();
-    }
-
-    pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) {
-        self.temp_wort = wort;
-        self.temp_fridge = fridge;
-
-    	if self.temp_wort.is_some() {
-            self.wort_valid_time = Instant::now();
-        }
-
-        self.tick();
-    }
-
 }