Mercurial > templog
changeset 594:aff50ee77252 rust
rust working better now with streams and sinks.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Jan 2017 17:18:44 +0800 |
parents | bf138339d20a |
children | e87655ed8429 |
files | .hgignore rust/Cargo.lock rust/Cargo.toml rust/src/fridge.rs rust/src/main.rs rust/src/paramwaiter.rs rust/src/sensor.rs rust/src/types.rs |
diffstat | 8 files changed, 328 insertions(+), 111 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Tue Dec 27 00:51:28 2016 +0800 +++ b/.hgignore Wed Jan 04 17:18:44 2017 +0800 @@ -1,1 +1,2 @@ venv +rust/target
--- a/rust/Cargo.lock Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/Cargo.lock Wed Jan 04 17:18:44 2017 +0800 @@ -2,10 +2,20 @@ name = "wort-templog" version = "0.1.0" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "aho-corasick" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -19,8 +29,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "env_logger" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "futures" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -51,15 +70,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "memchr" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "mio" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -68,7 +95,7 @@ [[package]] name = "miow" -version = "0.1.3" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -111,6 +138,23 @@ ] [[package]] +name = "regex" +version = "0.1.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex-syntax" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "rustc-serialize" version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -139,18 +183,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] -name = "tokio-core" -version = "0.1.1" +name = "thread-id" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thread_local" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-core" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] +name = "utf8-ranges" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "void" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -175,24 +241,32 @@ ] [metadata] +"checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" "checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" -"checksum futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0bad0a2ac64b227fdc10c254051ae5af542cf19c9328704fd4092f7914196897" +"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" +"checksum futures 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "177a82a61dd7e528022ce97f24e54b499dd2fee4d4646a0f283c5fb500dbfe20" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" "checksum libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "a51822fc847e7a8101514d1d44e354ba2ffa7d4c194dcab48870740e327cac70" "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054" -"checksum mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "410a1a0ff76f5a226f1e4e3ff1756128e65cd30166e39c3892283e2ac09d5b67" -"checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a" +"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20" +"checksum mio 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5b493dc9fd96bd2077f2117f178172b0765db4dfda3ea4d8000401e6d65d3e80" +"checksum miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3e690c5df6b2f60acd45d56378981e827ff8295562fc8d34f573deb267a59cd1" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" "checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b" "checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d" +"checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f" +"checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957" "checksum rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "237546c689f20bb44980270c73c3b9edd0891c1be49cc1274406134a66d3957b" "checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" "checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d" "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" -"checksum tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "06f40e15561569e24dab3dcf270c0bb950195b84dbed591dfb6591e28c9b9cff" +"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03" +"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5" +"checksum tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "52416b3e937abac22a543a7f1c66bd37feb60137ff1ab42390fa02df85347e58" +"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
--- a/rust/Cargo.toml Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/Cargo.toml Wed Jan 04 17:18:44 2017 +0800 @@ -4,7 +4,12 @@ authors = ["Matt Johnston <[email protected]>"] [dependencies] -futures = "0.1.6" -tokio-core = "0.1.1" +futures = "0.1" +tokio-core = "0.1" rustc-serialize = "0.3" rand = "0.3" +log = "0.3" +env_logger = "0.3" + +[features] +testmode = []
--- a/rust/src/fridge.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/fridge.rs Wed Jan 04 17:18:44 2017 +0800 @@ -1,65 +1,131 @@ +extern crate futures; +extern crate tokio_core; + +use std; use std::io; +use std::mem; use std::time::Duration; -use futures::Future; -use tokio_core::reactor::Timeout; -use tokio_core::reactor::Handle; +use futures::{Future,future,Sink,Stream}; +use tokio_core::reactor::{Timeout,Handle}; +use futures::sync::{mpsc}; use types::*; +#[derive(Debug)] +pub enum Message { + Sensor {wort: Option<f32>, fridge: Option<f32>}, + Params (Params), + Tick(u64), +} + pub struct Fridge { params: Params, temp_wort: Option<f32>, temp_fridge: Option<f32>, - // timeouts to wake ourself up again - //overshoot_timeout: Option<Future<Item=(), Error=io::Error>>, - //fridgeoff_timeout: Option<Future<Item=(), Error=io::Error>>, - wortvalid_timeout: Option<Timeout>, + // Timeouts to wake ourselves up again + handle: Handle, + timeout_s: mpsc::Sender<u64>, + timeout_r: Option<mpsc::Receiver<u64>>, + ticker: u64, +} + +impl Sink for Fridge { + + type SinkItem = Message; + type SinkError = std::io::Error; + + fn start_send(&mut self, msg: Message) + -> futures::StartSend<Self::SinkItem, Self::SinkError> { + self.process_msg(msg); + Ok(futures::AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> { + Ok(futures::Async::Ready(())) + } } impl Fridge { - pub fn new(p: Params) -> Fridge { - Fridge { + pub fn new(p: Params, handle: &Handle) -> Fridge { + let (s, r) = mpsc::channel(1); + let mut f = Fridge { params: p, temp_wort: None, temp_fridge: None, - //overshoot_timeout: None, - //fridgeoff_timeout: None, - wortvalid_timeout: None, - } + + handle: handle.clone(), + timeout_s: s, + timeout_r: Some(r), + ticker: 0, + }; + f.tick(); + f } - fn tick(&mut self, handle: &Handle) { + /// Returns a stream of timeouts for fridge, waking when next necessary + pub fn timeouts(&mut self) + -> Box<Stream<Item=Message, Error=io::Error>> { + mem::replace(&mut self.timeout_r, None) + .expect("NumberWatcher::timeouts() can only be called once") + .map(|v| Message::Tick(v)) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel")) + .boxed() + } + fn next_wakeup(&self) -> Duration { + let millis = 400; + let dur = Duration::from_millis(millis); + dur + } + + fn tick(&mut self) { + debug!("tick"); + + self.send_next_timeout(); } - pub fn set_params(&mut self, handle: &Handle, p: Params) { - self.params = p; - println!("params {:?}", self.params); - - self.tick(handle); + /// Sets the next self-wakeup timeout + fn send_next_timeout(&mut self) { + let waker = self.timeout_s.clone(); + let dur = self.next_wakeup(); + debug!("fridge next wakeup {:?}", dur); + self.ticker += 1; + let v = self.ticker; + let t = Timeout::new(dur, &self.handle).unwrap() + .map_err(|_| ()) + .and_then(move |_| { + waker.send(v) + .map_err(|_| ()) + }) + .map(|_| ()); + self.handle.spawn(t); } - pub fn set_temps(&mut self, handle: &Handle, - wort: Option<f32>, fridge: Option<f32>) { + fn process_msg(&mut self, msg: Message) + -> Box<Future<Item=(), Error=()>> { + debug!("process_msg {:?}", msg); + match msg { + Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge), + Message::Params(p) => self.update_params(p), + Message::Tick(v) => if v == self.ticker {self.tick()}, + }; + future::ok::<(),()>(()).boxed() + } + + pub fn update_params(&mut self, p: Params) { + self.params = p; + println!("fridge set_params {:?}", self.params); + + self.tick(); + } + + pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) { self.temp_wort = wort; self.temp_fridge = fridge; - if let Some(_) = self.temp_wort { - // set a new timeout, replacing any existing - let dur = Duration::new(10, 0); // XXX - let t = Timeout::new(dur, handle).unwrap(); - /* - handle.spawn(t.and_then(|_| { - self.tick(handle); - Ok(()) - }).map_err(|x| ())); - */ - self.wortvalid_timeout = Some(t); - } - - self.tick(handle); + self.tick(); } }
--- a/rust/src/main.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/main.rs Wed Jan 04 17:18:44 2017 +0800 @@ -1,10 +1,16 @@ extern crate tokio_core; extern crate futures; extern crate rustc_serialize; +#[macro_use] +extern crate log; +extern crate env_logger; + +use std::io; use tokio_core::reactor::Core; -use futures::{Future,Stream}; -use rustc_serialize::json; +use futures::{Stream,Sink,Future}; +use futures::sync::{mpsc}; +use sensor::Sensor; mod sensor; mod fridge; @@ -14,34 +20,50 @@ use types::*; fn main() { - println!("Wort Templog"); + env_logger::init().unwrap(); - let mut paramh = ParamHolder::new(); - let mut readings = Readings::new(); - let mut fridge = fridge::Fridge::new(paramh.p.clone()); + println!("Wort Templog"); + debug!("debug log level"); let mut core = Core::new().unwrap(); let handle = core.handle(); - let s = sensor::Sensor::run(&handle, 400, "sens1".to_string()); - let w = paramwaiter::ParamWaiter::run(&handle, 3000); + let mut paramh = ParamHolder::new(); + let mut fridge = fridge::Fridge::new(paramh.p, &handle); + + let (sensor_s, sensor_r) = mpsc::channel(1); + let sensor_r = sensor_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with sensor_r channel")); - let h = s.for_each(move |r| { - readings.push(r); - println!("readings {:?}", readings); - Ok(()) + let sensor_stream = if cfg!(feature = "testmode") { + sensor::TestSensor::stream(&handle) + } else { + sensor::OneWireSensor::stream(&handle) + }; + + // Send the sensors of interest to the fridge (sensor_s), + // while streaming them all to the web sender. + let s = sensor_stream.map(|r| { + debug!("sensors {:?}", r); + let t = sensor_s.clone().send(fridge::Message::Sensor{wort: r.wort(), fridge: r.fridge()}) + .map(|_| ()) + .map_err(|_| ()); + handle.spawn(t); + r }); - let j = w.for_each(move |p| { - fridge.set_params(&handle, p.clone()); - paramh.p = p; - Ok(()) - }); + let param_stream = paramwaiter::ParamWaiter::stream(&handle); + let p = param_stream.map(|p| { + fridge::Message::Params(p) + }); + + let timeouts = fridge.timeouts(); - handle.spawn(h.map_err(|x| ())); - handle.spawn(j.map_err(|x| ())); + let all_readings = s.for_each(|_| Ok(())); + let all_fridge = p.select(timeouts).select(sensor_r).forward(fridge) + .map(|_| () ); - let forever = futures::empty::<(),()>(); - core.run(forever); + let all = all_fridge.select(all_readings); + + core.run(all).ok(); }
--- a/rust/src/paramwaiter.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/paramwaiter.rs Wed Jan 04 17:18:44 2017 +0800 @@ -16,7 +16,6 @@ impl ParamWaiter { fn step(&mut self) -> Params { let mut p = Params::defaults(); - let mut rng = rand::thread_rng(); p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); p } @@ -25,10 +24,10 @@ ParamWaiter {} } - pub fn run(handle: &Handle, rate: u64) -> Box<Stream<Item=Params, Error=io::Error>> { + pub fn stream(handle: &Handle) -> Box<Stream<Item=Params, Error=io::Error>> { let mut s = ParamWaiter::new(); - let dur = Duration::from_millis(rate); + let dur = Duration::from_millis(4000); Interval::new(dur, handle).unwrap().map(move |()| { s.step() }).boxed()
--- a/rust/src/sensor.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/sensor.rs Wed Jan 04 17:18:44 2017 +0800 @@ -3,37 +3,86 @@ use std::time::Duration; use std::io; +use std::fs::File; +use std::io::Read; use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; use futures::Stream; use types::*; -pub struct Sensor { - current: f32, - suf: String, +pub trait Sensor { + fn stream(handle: &Handle) + -> Box<Stream<Item=Readings, Error=io::Error>>; +} + +pub struct OneWireSensor { } -impl Sensor { - fn step(&mut self) -> Vec<Reading> { - let mut r = Vec::new(); - self.current = self.current + 0.1; - r.push(Reading::new("aaa".to_string() + &self.suf, self.current)); - r.push(Reading::new("b".to_string() + &self.suf, self.current/3.0)); - r +impl OneWireSensor { + fn new() -> OneWireSensor { + OneWireSensor {} + // todo } - fn new(suffix: String) -> Self { - Sensor { current: 22.0, suf: suffix } + fn step(&mut self) -> Readings { + let mut r = Readings::new(); + r.add("ambient", Some(31.2)); + r.add("wort_todo", Some(8.0)); + debug!("sensor step {:?}", r); + r } +} - pub fn run(handle: &Handle, rate: u64, suffix: String) -> Box<Stream<Item=Vec<Reading>, Error=io::Error>> { - let mut s = Sensor::new(suffix); +impl Sensor for OneWireSensor { + fn stream(handle: &Handle) + -> Box<Stream<Item=Readings, Error=io::Error>> { + let mut s = OneWireSensor::new(); - let dur = Duration::from_millis(rate); + let dur = Duration::from_millis(600); Interval::new(dur, handle).unwrap().map(move |()| { s.step() }).boxed() } } +pub struct TestSensor { +} + +impl TestSensor { + fn step(&mut self) -> Readings { + let mut r = Readings::new(); + r.add("ambient", Some(31.2)); + r.add("wort", Some(Self::try_read("test_wort.txt").unwrap_or_else(|_| 18.0))); + r.add("fridge", Some(Self::try_read("test_fridge.txt").unwrap_or_else(|_| 20.0))); + r + } + + fn try_read(filename: &str) -> Result<f32, String> { + File::open(filename) + .map_err(|e| e.to_string()) + .and_then(|mut f| { + let mut s = String::new(); + f.read_to_string(&mut s) + .map_err(|e| e.to_string()) + .map(|_| s) + }) + .and_then(|s| { + s.trim().parse::<f32>() + .map_err(|e| e.to_string()) + }) + } +} + +impl Sensor for TestSensor { + fn stream(handle: &Handle) + -> Box<Stream<Item=Readings, Error=io::Error>> { + let mut s = TestSensor {}; + + let dur = Duration::new(1,0); + Interval::new(dur, handle).unwrap().map(move |()| { + s.step() + }).boxed() + } +} +
--- a/rust/src/types.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/types.rs Wed Jan 04 17:18:44 2017 +0800 @@ -1,3 +1,6 @@ +use std::collections::HashMap; +use std::time::Duration; + #[derive(RustcDecodable, RustcEncodable, Debug, Clone)] pub struct Params { pub fridge_setpoint: f32, @@ -41,42 +44,40 @@ } #[derive(Debug)] -pub struct Reading { - name: String, - value: Option<f32>, -} - -impl Reading { - pub fn new(name: String, value: f32) -> Reading { - Reading { name: name, value: Some(value) } - } - pub fn new_none(name: String) -> Reading { - Reading { name: name, value: None } - } -} - -#[derive(Debug)] pub struct Readings { - temps: Vec<Vec<Reading>>, + temps: HashMap<String, Option<f32>>, } impl Readings { pub fn new() -> Readings { Readings { - temps: Vec::new(), + temps: HashMap::new(), } } - pub fn push(&mut self, vals: Vec<Reading>) { - self.temps.push(vals); + pub fn add(&mut self, name: &str, v: Option<f32>) { + if let Some(prev) = self.temps.insert(name.to_string(), v) { + warn!("Replaced existing reading '{}' {:?} -> {:?}", + name, prev, v); + } } pub fn fridge(&self) -> Option<f32> { - unimplemented!(); + if let Some(t) = self.temps.get("fridge") { + t.clone() + } else { + warn!("No fridge reading was added"); + None + } } pub fn wort(&self) -> Option<f32> { - unimplemented!(); + if let Some(t) = self.temps.get("wort") { + t.clone() + } else { + warn!("No wort reading was added"); + None + } } }