Mercurial > templog
diff rust/src/fridge.rs @ 594:aff50ee77252 rust
rust working better now with streams and sinks.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Jan 2017 17:18:44 +0800 |
parents | bf138339d20a |
children | e87655ed8429 |
line wrap: on
line diff
--- a/rust/src/fridge.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/fridge.rs Wed Jan 04 17:18:44 2017 +0800 @@ -1,65 +1,131 @@ +extern crate futures; +extern crate tokio_core; + +use std; use std::io; +use std::mem; use std::time::Duration; -use futures::Future; -use tokio_core::reactor::Timeout; -use tokio_core::reactor::Handle; +use futures::{Future,future,Sink,Stream}; +use tokio_core::reactor::{Timeout,Handle}; +use futures::sync::{mpsc}; use types::*; +#[derive(Debug)] +pub enum Message { + Sensor {wort: Option<f32>, fridge: Option<f32>}, + Params (Params), + Tick(u64), +} + pub struct Fridge { params: Params, temp_wort: Option<f32>, temp_fridge: Option<f32>, - // timeouts to wake ourself up again - //overshoot_timeout: Option<Future<Item=(), Error=io::Error>>, - //fridgeoff_timeout: Option<Future<Item=(), Error=io::Error>>, - wortvalid_timeout: Option<Timeout>, + // Timeouts to wake ourselves up again + handle: Handle, + timeout_s: mpsc::Sender<u64>, + timeout_r: Option<mpsc::Receiver<u64>>, + ticker: u64, +} + +impl Sink for Fridge { + + type SinkItem = Message; + type SinkError = std::io::Error; + + fn start_send(&mut self, msg: Message) + -> futures::StartSend<Self::SinkItem, Self::SinkError> { + self.process_msg(msg); + Ok(futures::AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { + Ok(futures::Async::Ready(())) + } } impl Fridge { - pub fn new(p: Params) -> Fridge { - Fridge { + pub fn new(p: Params, handle: &Handle) -> Fridge { + let (s, r) = mpsc::channel(1); + let mut f = Fridge { params: p, temp_wort: None, temp_fridge: None, - //overshoot_timeout: None, - //fridgeoff_timeout: None, - wortvalid_timeout: None, - } + + handle: handle.clone(), + timeout_s: s, + timeout_r: Some(r), + ticker: 0, + }; + f.tick(); + f } - fn tick(&mut self, handle: &Handle) { + /// Returns a stream of timeouts for fridge, waking when next necessary + pub fn timeouts(&mut self) + -> Box<Stream<Item=Message, Error=io::Error>> { + mem::replace(&mut self.timeout_r, None) + .expect("NumberWatcher::timeouts() can only be called once") + .map(|v| Message::Tick(v)) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel")) + .boxed() + } + fn next_wakeup(&self) -> Duration { + let millis = 400; + let dur = Duration::from_millis(millis); + dur + } + + fn tick(&mut self) { + debug!("tick"); + + self.send_next_timeout(); } - pub fn set_params(&mut self, handle: &Handle, p: Params) { - self.params = p; - println!("params {:?}", self.params); - - self.tick(handle); + /// Sets the next self-wakeup timeout + fn send_next_timeout(&mut self) { + let waker = self.timeout_s.clone(); + let dur = self.next_wakeup(); + debug!("fridge next wakeup {:?}", dur); + self.ticker += 1; + let v = self.ticker; + let t = Timeout::new(dur, &self.handle).unwrap() + .map_err(|_| ()) + .and_then(move |_| { + waker.send(v) + .map_err(|_| ()) + }) + .map(|_| ()); + self.handle.spawn(t); } - pub fn set_temps(&mut self, handle: &Handle, - wort: Option<f32>, fridge: Option<f32>) { + fn process_msg(&mut self, msg: Message) + -> Box<Future<Item=(), Error=()>> { + debug!("process_msg {:?}", msg); + match msg { + Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge), + Message::Params(p) => self.update_params(p), + Message::Tick(v) => if v == self.ticker {self.tick()}, + }; + future::ok::<(),()>(()).boxed() + } + + pub fn update_params(&mut self, p: Params) { + self.params = p; + println!("fridge set_params {:?}", self.params); + + self.tick(); + } + + pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) { self.temp_wort = wort; self.temp_fridge = fridge; - if let Some(_) = self.temp_wort { - // set a new timeout, replacing any existing - let dur = Duration::new(10, 0); // XXX - let t = Timeout::new(dur, handle).unwrap(); - /* - handle.spawn(t.and_then(|_| { - self.tick(handle); - Ok(()) - }).map_err(|x| ())); - */ - self.wortvalid_timeout = Some(t); - } - - self.tick(handle); + self.tick(); } }