Mercurial > templog
comparison rust/src/fridge.rs @ 594:aff50ee77252 rust
rust working better now with streams and sinks.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Jan 2017 17:18:44 +0800 |
parents | bf138339d20a |
children | e87655ed8429 |
comparison
equal
deleted
inserted
replaced
593:bf138339d20a | 594:aff50ee77252 |
---|---|
1 extern crate futures; | |
2 extern crate tokio_core; | |
3 | |
4 use std; | |
1 use std::io; | 5 use std::io; |
6 use std::mem; | |
2 use std::time::Duration; | 7 use std::time::Duration; |
3 | 8 |
4 use futures::Future; | 9 use futures::{Future,future,Sink,Stream}; |
5 use tokio_core::reactor::Timeout; | 10 use tokio_core::reactor::{Timeout,Handle}; |
6 use tokio_core::reactor::Handle; | 11 use futures::sync::{mpsc}; |
7 | 12 |
8 use types::*; | 13 use types::*; |
14 | |
15 #[derive(Debug)] | |
16 pub enum Message { | |
17 Sensor {wort: Option<f32>, fridge: Option<f32>}, | |
18 Params (Params), | |
19 Tick(u64), | |
20 } | |
9 | 21 |
10 pub struct Fridge { | 22 pub struct Fridge { |
11 params: Params, | 23 params: Params, |
12 temp_wort: Option<f32>, | 24 temp_wort: Option<f32>, |
13 temp_fridge: Option<f32>, | 25 temp_fridge: Option<f32>, |
14 | 26 |
15 // timeouts to wake ourself up again | 27 // Timeouts to wake ourselves up again |
16 //overshoot_timeout: Option<Future<Item=(), Error=io::Error>>, | 28 handle: Handle, |
17 //fridgeoff_timeout: Option<Future<Item=(), Error=io::Error>>, | 29 timeout_s: mpsc::Sender<u64>, |
18 wortvalid_timeout: Option<Timeout>, | 30 timeout_r: Option<mpsc::Receiver<u64>>, |
31 ticker: u64, | |
32 } | |
33 | |
34 impl Sink for Fridge { | |
35 | |
36 type SinkItem = Message; | |
37 type SinkError = std::io::Error; | |
38 | |
39 fn start_send(&mut self, msg: Message) | |
40 -> futures::StartSend<Self::SinkItem, Self::SinkError> { | |
41 self.process_msg(msg); | |
42 Ok(futures::AsyncSink::Ready) | |
43 } | |
44 | |
45 fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { | |
46 Ok(futures::Async::Ready(())) | |
47 } | |
19 } | 48 } |
20 | 49 |
21 impl Fridge { | 50 impl Fridge { |
22 pub fn new(p: Params) -> Fridge { | 51 pub fn new(p: Params, handle: &Handle) -> Fridge { |
23 Fridge { | 52 let (s, r) = mpsc::channel(1); |
53 let mut f = Fridge { | |
24 params: p, | 54 params: p, |
25 temp_wort: None, | 55 temp_wort: None, |
26 temp_fridge: None, | 56 temp_fridge: None, |
27 //overshoot_timeout: None, | 57 |
28 //fridgeoff_timeout: None, | 58 handle: handle.clone(), |
29 wortvalid_timeout: None, | 59 timeout_s: s, |
30 } | 60 timeout_r: Some(r), |
61 ticker: 0, | |
62 }; | |
63 f.tick(); | |
64 f | |
31 } | 65 } |
32 | 66 |
33 fn tick(&mut self, handle: &Handle) { | 67 /// Returns a stream of timeouts for fridge, waking when next necessary |
34 | 68 pub fn timeouts(&mut self) |
69 -> Box<Stream<Item=Message, Error=io::Error>> { | |
70 mem::replace(&mut self.timeout_r, None) | |
71 .expect("NumberWatcher::timeouts() can only be called once") | |
72 .map(|v| Message::Tick(v)) | |
73 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel")) | |
74 .boxed() | |
35 } | 75 } |
36 | 76 |
37 pub fn set_params(&mut self, handle: &Handle, p: Params) { | 77 fn next_wakeup(&self) -> Duration { |
38 self.params = p; | 78 let millis = 400; |
39 println!("params {:?}", self.params); | 79 let dur = Duration::from_millis(millis); |
40 | 80 dur |
41 self.tick(handle); | |
42 } | 81 } |
43 | 82 |
44 pub fn set_temps(&mut self, handle: &Handle, | 83 fn tick(&mut self) { |
45 wort: Option<f32>, fridge: Option<f32>) { | 84 debug!("tick"); |
85 | |
86 self.send_next_timeout(); | |
87 } | |
88 | |
89 /// Sets the next self-wakeup timeout | |
90 fn send_next_timeout(&mut self) { | |
91 let waker = self.timeout_s.clone(); | |
92 let dur = self.next_wakeup(); | |
93 debug!("fridge next wakeup {:?}", dur); | |
94 self.ticker += 1; | |
95 let v = self.ticker; | |
96 let t = Timeout::new(dur, &self.handle).unwrap() | |
97 .map_err(|_| ()) | |
98 .and_then(move |_| { | |
99 waker.send(v) | |
100 .map_err(|_| ()) | |
101 }) | |
102 .map(|_| ()); | |
103 self.handle.spawn(t); | |
104 } | |
105 | |
106 fn process_msg(&mut self, msg: Message) | |
107 -> Box<Future<Item=(), Error=()>> { | |
108 debug!("process_msg {:?}", msg); | |
109 match msg { | |
110 Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge), | |
111 Message::Params(p) => self.update_params(p), | |
112 Message::Tick(v) => if v == self.ticker {self.tick()}, | |
113 }; | |
114 future::ok::<(),()>(()).boxed() | |
115 } | |
116 | |
117 pub fn update_params(&mut self, p: Params) { | |
118 self.params = p; | |
119 println!("fridge set_params {:?}", self.params); | |
120 | |
121 self.tick(); | |
122 } | |
123 | |
124 pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) { | |
46 self.temp_wort = wort; | 125 self.temp_wort = wort; |
47 self.temp_fridge = fridge; | 126 self.temp_fridge = fridge; |
48 | 127 |
49 if let Some(_) = self.temp_wort { | 128 self.tick(); |
50 // set a new timeout, replacing any existing | |
51 let dur = Duration::new(10, 0); // XXX | |
52 let t = Timeout::new(dur, handle).unwrap(); | |
53 /* | |
54 handle.spawn(t.and_then(|_| { | |
55 self.tick(handle); | |
56 Ok(()) | |
57 }).map_err(|x| ())); | |
58 */ | |
59 self.wortvalid_timeout = Some(t); | |
60 } | |
61 | |
62 self.tick(handle); | |
63 } | 129 } |
64 | 130 |
65 } | 131 } |