diff rust/src/fridge.rs @ 594:aff50ee77252 rust

rust working better now with streams and sinks.
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Jan 2017 17:18:44 +0800
parents bf138339d20a
children e87655ed8429
line wrap: on
line diff
--- a/rust/src/fridge.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/fridge.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -1,65 +1,131 @@
+extern crate futures;
+extern crate tokio_core;
+
+use std;
 use std::io;
+use std::mem;
 use std::time::Duration;
 
-use futures::Future;
-use tokio_core::reactor::Timeout;
-use tokio_core::reactor::Handle;
+use futures::{Future,future,Sink,Stream};
+use tokio_core::reactor::{Timeout,Handle};
+use futures::sync::{mpsc};
 
 use types::*;
 
+#[derive(Debug)]
+pub enum Message {
+    Sensor {wort: Option<f32>, fridge: Option<f32>},
+    Params (Params),
+    Tick(u64),
+}
+
 pub struct Fridge {
     params: Params,
     temp_wort: Option<f32>,
     temp_fridge: Option<f32>,
 
-    // timeouts to wake ourself up again
-    //overshoot_timeout: Option<Future<Item=(), Error=io::Error>>,
-    //fridgeoff_timeout: Option<Future<Item=(), Error=io::Error>>,
-    wortvalid_timeout: Option<Timeout>,
+    // Timeouts to wake ourselves up again
+    handle: Handle,
+    timeout_s: mpsc::Sender<u64>,
+    timeout_r: Option<mpsc::Receiver<u64>>,
+    ticker: u64,
+}
+
+impl Sink for Fridge {
+
+    type SinkItem = Message;
+    type SinkError = std::io::Error;
+
+    fn start_send(&mut self, msg: Message)
+            -> futures::StartSend<Self::SinkItem, Self::SinkError> {
+        self.process_msg(msg);
+        Ok(futures::AsyncSink::Ready)
+    }
+
+    fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
+        Ok(futures::Async::Ready(()))
+    }
 }
 
 impl Fridge {
-    pub fn new(p: Params) -> Fridge {
-        Fridge { 
+    pub fn new(p: Params, handle: &Handle) -> Fridge {
+        let (s, r) = mpsc::channel(1);
+        let mut f = Fridge { 
             params: p,
             temp_wort: None,
             temp_fridge: None,
-            //overshoot_timeout: None,
-            //fridgeoff_timeout: None,
-            wortvalid_timeout: None,
-        }
+
+            handle: handle.clone(),
+            timeout_s: s,
+            timeout_r: Some(r),
+            ticker: 0,
+        };
+        f.tick();
+        f
     }
 
-    fn tick(&mut self, handle: &Handle) {
+    /// Returns a stream of timeouts for fridge, waking when next necessary
+    pub fn timeouts(&mut self)
+            -> Box<Stream<Item=Message, Error=io::Error>> {
+        mem::replace(&mut self.timeout_r, None)
+            .expect("NumberWatcher::timeouts() can only be called once")
+            .map(|v| Message::Tick(v))
+            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel"))
+            .boxed()
+    }
 
+    fn next_wakeup(&self) -> Duration {
+        let millis = 400;
+        let dur = Duration::from_millis(millis);
+        dur
+    }
+
+    fn tick(&mut self) {
+        debug!("tick");
+
+        self.send_next_timeout();
     }
 
-    pub fn set_params(&mut self, handle: &Handle, p: Params) {
-        self.params = p;
-        println!("params {:?}", self.params);
-
-        self.tick(handle);
+    /// Sets the next self-wakeup timeout
+    fn send_next_timeout(&mut self) {
+        let waker = self.timeout_s.clone();
+        let dur = self.next_wakeup();
+        debug!("fridge next wakeup {:?}", dur);
+        self.ticker += 1;
+        let v = self.ticker;
+        let t = Timeout::new(dur, &self.handle).unwrap()
+            .map_err(|_| ())
+            .and_then(move |_| {
+                waker.send(v)
+                    .map_err(|_| ())
+            })
+            .map(|_| ());
+        self.handle.spawn(t);
     }
 
-    pub fn set_temps(&mut self, handle: &Handle, 
-                wort: Option<f32>, fridge: Option<f32>) {
+    fn process_msg(&mut self, msg: Message)
+            -> Box<Future<Item=(), Error=()>> {
+        debug!("process_msg {:?}", msg);
+        match msg {
+            Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge),
+            Message::Params(p) => self.update_params(p),
+            Message::Tick(v) => if v == self.ticker {self.tick()},
+        };
+        future::ok::<(),()>(()).boxed()
+    }
+
+    pub fn update_params(&mut self, p: Params) {
+        self.params = p;
+        println!("fridge set_params {:?}", self.params);
+
+        self.tick();
+    }
+
+    pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) {
         self.temp_wort = wort;
         self.temp_fridge = fridge;
 
-        if let Some(_) = self.temp_wort {
-            // set a new timeout, replacing any existing
-            let dur = Duration::new(10, 0); // XXX
-            let t = Timeout::new(dur, handle).unwrap();
-            /*
-            handle.spawn(t.and_then(|_| {
-                                self.tick(handle);
-                                Ok(())
-                            }).map_err(|x| ()));
-                            */
-            self.wortvalid_timeout = Some(t);
-        }
-
-        self.tick(handle);
+        self.tick();
     }
 
 }