Mercurial > templog
diff rust/src/main.rs @ 594:aff50ee77252 rust
rust working better now with streams and sinks.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Jan 2017 17:18:44 +0800 |
parents | bf138339d20a |
children | e87655ed8429 |
line wrap: on
line diff
--- a/rust/src/main.rs Tue Dec 27 00:51:28 2016 +0800 +++ b/rust/src/main.rs Wed Jan 04 17:18:44 2017 +0800 @@ -1,10 +1,16 @@ extern crate tokio_core; extern crate futures; extern crate rustc_serialize; +#[macro_use] +extern crate log; +extern crate env_logger; + +use std::io; use tokio_core::reactor::Core; -use futures::{Future,Stream}; -use rustc_serialize::json; +use futures::{Stream,Sink,Future}; +use futures::sync::{mpsc}; +use sensor::Sensor; mod sensor; mod fridge; @@ -14,34 +20,50 @@ use types::*; fn main() { - println!("Wort Templog"); + env_logger::init().unwrap(); - let mut paramh = ParamHolder::new(); - let mut readings = Readings::new(); - let mut fridge = fridge::Fridge::new(paramh.p.clone()); + println!("Wort Templog"); + debug!("debug log level"); let mut core = Core::new().unwrap(); let handle = core.handle(); - let s = sensor::Sensor::run(&handle, 400, "sens1".to_string()); - let w = paramwaiter::ParamWaiter::run(&handle, 3000); + let mut paramh = ParamHolder::new(); + let mut fridge = fridge::Fridge::new(paramh.p, &handle); + + let (sensor_s, sensor_r) = mpsc::channel(1); + let sensor_r = sensor_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with sensor_r channel")); - let h = s.for_each(move |r| { - readings.push(r); - println!("readings {:?}", readings); - Ok(()) + let sensor_stream = if cfg!(feature = "testmode") { + sensor::TestSensor::stream(&handle) + } else { + sensor::OneWireSensor::stream(&handle) + }; + + // Send the sensors of interest to the fridge (sensor_s), + // while streaming them all to the web sender. + let s = sensor_stream.map(|r| { + debug!("sensors {:?}", r); + let t = sensor_s.clone().send(fridge::Message::Sensor{wort: r.wort(), fridge: r.fridge()}) + .map(|_| ()) + .map_err(|_| ()); + handle.spawn(t); + r }); - let j = w.for_each(move |p| { - fridge.set_params(&handle, p.clone()); - paramh.p = p; - Ok(()) - }); + let param_stream = paramwaiter::ParamWaiter::stream(&handle); + let p = param_stream.map(|p| { + fridge::Message::Params(p) + }); + + let timeouts = fridge.timeouts(); - handle.spawn(h.map_err(|x| ())); - handle.spawn(j.map_err(|x| ())); + let all_readings = s.for_each(|_| Ok(())); + let all_fridge = p.select(timeouts).select(sensor_r).forward(fridge) + .map(|_| () ); - let forever = futures::empty::<(),()>(); - core.run(forever); + let all = all_fridge.select(all_readings); + + core.run(all).ok(); }