diff rust/src/sensor.rs @ 603:b45b8b4cf0f5 rust

get rid of lazy_static, config is passed around better use of threadpool for sensors readings are no longer options
author Matt Johnston <matt@ucc.asn.au>
date Thu, 16 Feb 2017 23:19:12 +0800
parents 8c21df3711e2
children 278f1002b5c7
line wrap: on
line diff
--- a/rust/src/sensor.rs	Thu Feb 16 23:17:51 2017 +0800
+++ b/rust/src/sensor.rs	Thu Feb 16 23:19:12 2017 +0800
@@ -8,40 +8,59 @@
 use std::io::{Read,BufReader,BufRead};
 use std::path::PathBuf;
 use std::error::Error;
+use std::sync::Arc;
 
 use tokio_core::reactor::Interval;
 use tokio_core::reactor::Handle;
 use futures::Stream;
 use types::*;
+use ::Config;
 
-use ::rigid_config;
+
 
 pub trait Sensor {
-    fn stream(handle: &Handle)
+    fn stream(&self, handle: &Handle)
         -> Box<Stream<Item=Readings, Error=io::Error>>;
 }
 
+#[derive(Clone)]
 pub struct OneWireSensor {
+    config: Arc<Config>,
 }
 
 impl OneWireSensor {
-    fn new() -> OneWireSensor {
+    pub fn new(config: &Config) -> Self {
         OneWireSensor {
+            config: Arc::new(config.clone()),
         }
     }
 
-    fn step() -> Readings {
+    fn step(&self) -> Readings {
         let mut r = Readings::new();
 
+        if let Ok(names) = self.sensor_names() {
+            for n in &names {
+                match self.read_sensor(n) {
+                    Ok(s) => r.add(n, s),
+                    Err(e) => debug!("Error reading sensors {}: {}", n, e)
+                }
+            }
+        }
 
-        r.add("ambient", Some(31.2));
-        r.add("wort_todo", Some(8.0));
         debug!("sensor step {:?}", r);
         r
     }
 
+    fn read_sensor(&self, n: &str) -> Result<f32, Box<Error>> {
+        let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR);
+        path.push(n);
+        path.push("w1_slave");
+        let f = BufReader::new(File::open(path)?);
+        Ok(3.4)
+    }
+
     fn sensor_names(&self) -> Result<Vec<String>, Box<Error>> {
-        let mut path = PathBuf::from(&rigid_config.SENSOR_BASE_DIR);
+        let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR);
         path.push("w1_master_slaves");
 
         let f = BufReader::new(File::open(path)?);
@@ -50,37 +69,39 @@
     }
 }
 
-// does this need to be static?
-lazy_static! {
-    static ref thread_pool : futures_cpupool::CpuPool = futures_cpupool::CpuPool::new(3); // TODO: how many?
-}
+impl Sensor for OneWireSensor {
+
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=io::Error>> {
+        let pool = futures_cpupool::CpuPool::new(4); // TODO: how many?
 
-impl Sensor for OneWireSensor {
-    fn stream(handle: &Handle)
-        -> Box<Stream<Item=Readings, Error=io::Error>> {
-        let mut s = OneWireSensor::new();
-
-        let dur = Duration::new(rigid_config.SENSOR_SLEEP,0);
+        let dur = Duration::new(self.config.SENSOR_SLEEP,0);
+        let s = Arc::new(self.clone());
         Interval::new(dur, handle).unwrap()
-            .and_then(|()| {
-                thread_pool.spawn_fn(|| Ok(Self::step()))
-            }).boxed()
+            .and_then(move |()| {
+                let a = s.clone();
+                pool.spawn_fn(move || Ok(a.step()))
+            })
+            .boxed()
     }
 }
 
+#[derive(Clone)]
 pub struct TestSensor {
+    config: Config,
 }
 
 impl TestSensor {
-    pub fn new() -> Self {
-        TestSensor {}
+    pub fn new(config: &Config) -> Self {
+        TestSensor {
+            config: config.clone(),
+        }
     }
 
-    fn step(&mut self) -> Readings {
+    fn test_step() -> Readings {
         let mut r = Readings::new();
-        r.add("ambient", Some(31.2));
-        r.add("wort", Some(Self::try_read("test_wort.txt").unwrap_or_else(|_| 18.0)));
-        r.add("fridge", Some(Self::try_read("test_fridge.txt").unwrap_or_else(|_| 20.0)));
+        r.add("ambient", 31.2);
+        r.add("wort", Self::try_read("test_wort.txt").unwrap_or_else(|_| 18.0));
+        r.add("fridge", Self::try_read("test_fridge.txt").unwrap_or_else(|_| 20.0));
         r
     }
 
@@ -92,14 +113,14 @@
 }
 
 impl Sensor for TestSensor {
-    fn stream(handle: &Handle)
+    fn stream(&self, handle: &Handle)
         -> Box<Stream<Item=Readings, Error=io::Error>> {
-        let mut s = TestSensor::new();
 
-        let dur = Duration::new(rigid_config.SENSOR_SLEEP,0);
-        Interval::new(dur, handle).unwrap().map(move |()| {
-            s.step()
-        }).boxed()
+        let dur = Duration::new(self.config.SENSOR_SLEEP,0);
+        Interval::new(dur, handle).unwrap()
+            .and_then(move |()| {
+                Ok(Self::test_step())
+            }).boxed()
     }
 }