Mercurial > templog
view rust/src/fridge.rs @ 594:aff50ee77252 rust
rust working better now with streams and sinks.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Jan 2017 17:18:44 +0800 |
parents | bf138339d20a |
children | e87655ed8429 |
line wrap: on
line source
extern crate futures; extern crate tokio_core; use std; use std::io; use std::mem; use std::time::Duration; use futures::{Future,future,Sink,Stream}; use tokio_core::reactor::{Timeout,Handle}; use futures::sync::{mpsc}; use types::*; #[derive(Debug)] pub enum Message { Sensor {wort: Option<f32>, fridge: Option<f32>}, Params (Params), Tick(u64), } pub struct Fridge { params: Params, temp_wort: Option<f32>, temp_fridge: Option<f32>, // Timeouts to wake ourselves up again handle: Handle, timeout_s: mpsc::Sender<u64>, timeout_r: Option<mpsc::Receiver<u64>>, ticker: u64, } impl Sink for Fridge { type SinkItem = Message; type SinkError = std::io::Error; fn start_send(&mut self, msg: Message) -> futures::StartSend<Self::SinkItem, Self::SinkError> { self.process_msg(msg); Ok(futures::AsyncSink::Ready) } fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { Ok(futures::Async::Ready(())) } } impl Fridge { pub fn new(p: Params, handle: &Handle) -> Fridge { let (s, r) = mpsc::channel(1); let mut f = Fridge { params: p, temp_wort: None, temp_fridge: None, handle: handle.clone(), timeout_s: s, timeout_r: Some(r), ticker: 0, }; f.tick(); f } /// Returns a stream of timeouts for fridge, waking when next necessary pub fn timeouts(&mut self) -> Box<Stream<Item=Message, Error=io::Error>> { mem::replace(&mut self.timeout_r, None) .expect("NumberWatcher::timeouts() can only be called once") .map(|v| Message::Tick(v)) .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel")) .boxed() } fn next_wakeup(&self) -> Duration { let millis = 400; let dur = Duration::from_millis(millis); dur } fn tick(&mut self) { debug!("tick"); self.send_next_timeout(); } /// Sets the next self-wakeup timeout fn send_next_timeout(&mut self) { let waker = self.timeout_s.clone(); let dur = self.next_wakeup(); debug!("fridge next wakeup {:?}", dur); self.ticker += 1; let v = self.ticker; let t = Timeout::new(dur, &self.handle).unwrap() .map_err(|_| ()) .and_then(move |_| { waker.send(v) .map_err(|_| ()) }) .map(|_| ()); self.handle.spawn(t); } fn process_msg(&mut self, msg: Message) -> Box<Future<Item=(), Error=()>> { debug!("process_msg {:?}", msg); match msg { Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge), Message::Params(p) => self.update_params(p), Message::Tick(v) => if v == self.ticker {self.tick()}, }; future::ok::<(),()>(()).boxed() } pub fn update_params(&mut self, p: Params) { self.params = p; println!("fridge set_params {:?}", self.params); self.tick(); } pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) { self.temp_wort = wort; self.temp_fridge = fridge; self.tick(); } }