diff rust/src/main.rs @ 594:aff50ee77252 rust

rust working better now with streams and sinks.
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Jan 2017 17:18:44 +0800
parents bf138339d20a
children e87655ed8429
line wrap: on
line diff
--- a/rust/src/main.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/main.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -1,10 +1,16 @@
 extern crate tokio_core;
 extern crate futures;
 extern crate rustc_serialize;
+#[macro_use]
+extern crate log;
+extern crate env_logger;
+
+use std::io;
 
 use tokio_core::reactor::Core;
-use futures::{Future,Stream};
-use rustc_serialize::json;
+use futures::{Stream,Sink,Future};
+use futures::sync::{mpsc};
+use sensor::Sensor;
 
 mod sensor;
 mod fridge;
@@ -14,34 +20,50 @@
 use types::*;
 
 fn main() {
-    println!("Wort Templog");
+    env_logger::init().unwrap();
 
-    let mut paramh = ParamHolder::new();
-    let mut readings = Readings::new();
-    let mut fridge = fridge::Fridge::new(paramh.p.clone());
+    println!("Wort Templog");
+    debug!("debug log level");
 
     let mut core = Core::new().unwrap();
     let handle = core.handle();
 
-    let s = sensor::Sensor::run(&handle, 400, "sens1".to_string());
-    let w = paramwaiter::ParamWaiter::run(&handle, 3000);
+    let mut paramh = ParamHolder::new();
+    let mut fridge = fridge::Fridge::new(paramh.p, &handle);
+
+    let (sensor_s, sensor_r) = mpsc::channel(1);
+    let sensor_r = sensor_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with sensor_r channel"));
 
-    let h = s.for_each(move |r| {
-        readings.push(r);
-        println!("readings {:?}", readings);
-        Ok(())
+    let sensor_stream = if cfg!(feature = "testmode") {
+        sensor::TestSensor::stream(&handle)
+    } else {
+        sensor::OneWireSensor::stream(&handle)
+    };
+
+    // Send the sensors of interest to the fridge (sensor_s),
+    // while streaming them all to the web sender.
+    let s = sensor_stream.map(|r| {
+        debug!("sensors {:?}", r);
+        let t = sensor_s.clone().send(fridge::Message::Sensor{wort: r.wort(), fridge: r.fridge()})
+                    .map(|_| ())
+                    .map_err(|_| ());
+        handle.spawn(t);
+        r
     });
 
-    let j = w.for_each(move |p| {
-        fridge.set_params(&handle, p.clone());
-        paramh.p = p;
-        Ok(())
-    });
+    let param_stream = paramwaiter::ParamWaiter::stream(&handle);
+    let p = param_stream.map(|p| {
+            fridge::Message::Params(p)
+        });
+
+    let timeouts = fridge.timeouts();
 
-    handle.spawn(h.map_err(|x| ()));
-    handle.spawn(j.map_err(|x| ()));
+    let all_readings = s.for_each(|_| Ok(()));
+    let all_fridge = p.select(timeouts).select(sensor_r).forward(fridge)
+        .map(|_| () );
 
-    let forever = futures::empty::<(),()>();
-    core.run(forever);
+    let all = all_fridge.select(all_readings);
+
+    core.run(all).ok();
 }