comparison rust/src/main.rs @ 594:aff50ee77252 rust

rust working better now with streams and sinks.
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Jan 2017 17:18:44 +0800
parents bf138339d20a
children e87655ed8429
comparison
equal deleted inserted replaced
593:bf138339d20a 594:aff50ee77252
1 extern crate tokio_core; 1 extern crate tokio_core;
2 extern crate futures; 2 extern crate futures;
3 extern crate rustc_serialize; 3 extern crate rustc_serialize;
4 #[macro_use]
5 extern crate log;
6 extern crate env_logger;
7
8 use std::io;
4 9
5 use tokio_core::reactor::Core; 10 use tokio_core::reactor::Core;
6 use futures::{Future,Stream}; 11 use futures::{Stream,Sink,Future};
7 use rustc_serialize::json; 12 use futures::sync::{mpsc};
13 use sensor::Sensor;
8 14
9 mod sensor; 15 mod sensor;
10 mod fridge; 16 mod fridge;
11 mod types; 17 mod types;
12 mod paramwaiter; 18 mod paramwaiter;
13 19
14 use types::*; 20 use types::*;
15 21
16 fn main() { 22 fn main() {
23 env_logger::init().unwrap();
24
17 println!("Wort Templog"); 25 println!("Wort Templog");
18 26 debug!("debug log level");
19 let mut paramh = ParamHolder::new();
20 let mut readings = Readings::new();
21 let mut fridge = fridge::Fridge::new(paramh.p.clone());
22 27
23 let mut core = Core::new().unwrap(); 28 let mut core = Core::new().unwrap();
24 let handle = core.handle(); 29 let handle = core.handle();
25 30
26 let s = sensor::Sensor::run(&handle, 400, "sens1".to_string()); 31 let mut paramh = ParamHolder::new();
27 let w = paramwaiter::ParamWaiter::run(&handle, 3000); 32 let mut fridge = fridge::Fridge::new(paramh.p, &handle);
28 33
29 let h = s.for_each(move |r| { 34 let (sensor_s, sensor_r) = mpsc::channel(1);
30 readings.push(r); 35 let sensor_r = sensor_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with sensor_r channel"));
31 println!("readings {:?}", readings); 36
32 Ok(()) 37 let sensor_stream = if cfg!(feature = "testmode") {
38 sensor::TestSensor::stream(&handle)
39 } else {
40 sensor::OneWireSensor::stream(&handle)
41 };
42
43 // Send the sensors of interest to the fridge (sensor_s),
44 // while streaming them all to the web sender.
45 let s = sensor_stream.map(|r| {
46 debug!("sensors {:?}", r);
47 let t = sensor_s.clone().send(fridge::Message::Sensor{wort: r.wort(), fridge: r.fridge()})
48 .map(|_| ())
49 .map_err(|_| ());
50 handle.spawn(t);
51 r
33 }); 52 });
34 53
35 let j = w.for_each(move |p| { 54 let param_stream = paramwaiter::ParamWaiter::stream(&handle);
36 fridge.set_params(&handle, p.clone()); 55 let p = param_stream.map(|p| {
37 paramh.p = p; 56 fridge::Message::Params(p)
38 Ok(()) 57 });
39 });
40 58
41 handle.spawn(h.map_err(|x| ())); 59 let timeouts = fridge.timeouts();
42 handle.spawn(j.map_err(|x| ()));
43 60
44 let forever = futures::empty::<(),()>(); 61 let all_readings = s.for_each(|_| Ok(()));
45 core.run(forever); 62 let all_fridge = p.select(timeouts).select(sensor_r).forward(fridge)
63 .map(|_| () );
64
65 let all = all_fridge.select(all_readings);
66
67 core.run(all).ok();
46 } 68 }
47 69