diff rust/src/sensor.rs @ 611:f3e39e2107fd rust

still doesn't compile, improvements to TemplogError and tokio curl though
author Matt Johnston <matt@ucc.asn.au>
date Tue, 28 Feb 2017 22:58:47 +0800
parents 7bda01659426
children c57821a60e51
line wrap: on
line diff
--- a/rust/src/sensor.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/sensor.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -19,8 +19,7 @@
 use ::Config;
 
 pub trait Sensor {
-    fn stream(&self, handle: &Handle)
-        -> Box<Stream<Item=Readings, Error=io::Error>>;
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>>;
 }
 
 #[derive(Clone)]
@@ -51,7 +50,7 @@
         r
     }
 
-    fn read_sensor(&self, n: &str) -> Result<f32, Box<Error>> {
+    fn read_sensor(&self, n: &str) -> Result<f32, TemplogError> {
         lazy_static! {
             // multiline
             static ref THERM_RE: regex::Regex = regex::Regex::new("(?m).* YES\n.*t=(.*)\n").unwrap();
@@ -71,7 +70,7 @@
         Ok(f32::from_str(v)?)
     }
 
-    fn sensor_names(&self) -> Result<Vec<String>, Box<Error>> {
+    fn sensor_names(&self) -> Result<Vec<String>, TemplogError> {
         // TODO: needs to handle multiple busses.
         let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR);
         path.push("w1_master_slaves");
@@ -84,17 +83,18 @@
 
 impl Sensor for OneWireSensor {
 
-    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=io::Error>> {
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
         let pool = futures_cpupool::CpuPool::new(4); // TODO: how many?
 
         let dur = Duration::new(self.config.SENSOR_SLEEP,0);
         let s = Arc::new(self.clone());
-        Interval::new(dur, handle).unwrap()
+        let i = Interval::new(dur, handle).unwrap()
+            .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
             .and_then(move |()| {
                 let a = s.clone();
                 pool.spawn_fn(move || Ok(a.step()))
-            })
-            .boxed()
+            });
+        consume_errors(i)
     }
 }
 
@@ -118,7 +118,7 @@
         r
     }
 
-    fn try_read(filename: &str) -> Result<f32, Box<Error>> {
+    fn try_read(filename: &str) -> Result<f32, TemplogError> {
         let mut s = String::new();
         File::open(filename)?.read_to_string(&mut s)?;
         Ok(s.trim().parse::<f32>()?)
@@ -126,14 +126,15 @@
 }
 
 impl Sensor for TestSensor {
-    fn stream(&self, handle: &Handle)
-        -> Box<Stream<Item=Readings, Error=io::Error>> {
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
 
         let dur = Duration::new(self.config.SENSOR_SLEEP,0);
-        Interval::new(dur, handle).unwrap()
+        let i = Interval::new(dur, handle).unwrap()
+            .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
             .and_then(move |()| {
                 Ok(Self::test_step())
-            }).boxed()
+            });
+        consume_errors(i)
     }
 }