Mercurial > templog
comparison rust/src/main.rs @ 594:aff50ee77252 rust
rust working better now with streams and sinks.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Jan 2017 17:18:44 +0800 |
parents | bf138339d20a |
children | e87655ed8429 |
comparison
equal
deleted
inserted
replaced
593:bf138339d20a | 594:aff50ee77252 |
---|---|
1 extern crate tokio_core; | 1 extern crate tokio_core; |
2 extern crate futures; | 2 extern crate futures; |
3 extern crate rustc_serialize; | 3 extern crate rustc_serialize; |
4 #[macro_use] | |
5 extern crate log; | |
6 extern crate env_logger; | |
7 | |
8 use std::io; | |
4 | 9 |
5 use tokio_core::reactor::Core; | 10 use tokio_core::reactor::Core; |
6 use futures::{Future,Stream}; | 11 use futures::{Stream,Sink,Future}; |
7 use rustc_serialize::json; | 12 use futures::sync::{mpsc}; |
13 use sensor::Sensor; | |
8 | 14 |
9 mod sensor; | 15 mod sensor; |
10 mod fridge; | 16 mod fridge; |
11 mod types; | 17 mod types; |
12 mod paramwaiter; | 18 mod paramwaiter; |
13 | 19 |
14 use types::*; | 20 use types::*; |
15 | 21 |
16 fn main() { | 22 fn main() { |
23 env_logger::init().unwrap(); | |
24 | |
17 println!("Wort Templog"); | 25 println!("Wort Templog"); |
18 | 26 debug!("debug log level"); |
19 let mut paramh = ParamHolder::new(); | |
20 let mut readings = Readings::new(); | |
21 let mut fridge = fridge::Fridge::new(paramh.p.clone()); | |
22 | 27 |
23 let mut core = Core::new().unwrap(); | 28 let mut core = Core::new().unwrap(); |
24 let handle = core.handle(); | 29 let handle = core.handle(); |
25 | 30 |
26 let s = sensor::Sensor::run(&handle, 400, "sens1".to_string()); | 31 let mut paramh = ParamHolder::new(); |
27 let w = paramwaiter::ParamWaiter::run(&handle, 3000); | 32 let mut fridge = fridge::Fridge::new(paramh.p, &handle); |
28 | 33 |
29 let h = s.for_each(move |r| { | 34 let (sensor_s, sensor_r) = mpsc::channel(1); |
30 readings.push(r); | 35 let sensor_r = sensor_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with sensor_r channel")); |
31 println!("readings {:?}", readings); | 36 |
32 Ok(()) | 37 let sensor_stream = if cfg!(feature = "testmode") { |
38 sensor::TestSensor::stream(&handle) | |
39 } else { | |
40 sensor::OneWireSensor::stream(&handle) | |
41 }; | |
42 | |
43 // Send the sensors of interest to the fridge (sensor_s), | |
44 // while streaming them all to the web sender. | |
45 let s = sensor_stream.map(|r| { | |
46 debug!("sensors {:?}", r); | |
47 let t = sensor_s.clone().send(fridge::Message::Sensor{wort: r.wort(), fridge: r.fridge()}) | |
48 .map(|_| ()) | |
49 .map_err(|_| ()); | |
50 handle.spawn(t); | |
51 r | |
33 }); | 52 }); |
34 | 53 |
35 let j = w.for_each(move |p| { | 54 let param_stream = paramwaiter::ParamWaiter::stream(&handle); |
36 fridge.set_params(&handle, p.clone()); | 55 let p = param_stream.map(|p| { |
37 paramh.p = p; | 56 fridge::Message::Params(p) |
38 Ok(()) | 57 }); |
39 }); | |
40 | 58 |
41 handle.spawn(h.map_err(|x| ())); | 59 let timeouts = fridge.timeouts(); |
42 handle.spawn(j.map_err(|x| ())); | |
43 | 60 |
44 let forever = futures::empty::<(),()>(); | 61 let all_readings = s.for_each(|_| Ok(())); |
45 core.run(forever); | 62 let all_fridge = p.select(timeouts).select(sensor_r).forward(fridge) |
63 .map(|_| () ); | |
64 | |
65 let all = all_fridge.select(all_readings); | |
66 | |
67 core.run(all).ok(); | |
46 } | 68 } |
47 | 69 |