comparison rust/src/sensor.rs @ 611:f3e39e2107fd rust

still doesn't compile, improvements to TemplogError and tokio curl though
author Matt Johnston <matt@ucc.asn.au>
date Tue, 28 Feb 2017 22:58:47 +0800
parents 7bda01659426
children c57821a60e51
comparison
equal deleted inserted replaced
610:af0dac00d40b 611:f3e39e2107fd
17 use futures::Stream; 17 use futures::Stream;
18 use types::*; 18 use types::*;
19 use ::Config; 19 use ::Config;
20 20
21 pub trait Sensor { 21 pub trait Sensor {
22 fn stream(&self, handle: &Handle) 22 fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>>;
23 -> Box<Stream<Item=Readings, Error=io::Error>>;
24 } 23 }
25 24
26 #[derive(Clone)] 25 #[derive(Clone)]
27 pub struct OneWireSensor { 26 pub struct OneWireSensor {
28 config: Config, 27 config: Config,
49 48
50 debug!("sensor step {:?}", r); 49 debug!("sensor step {:?}", r);
51 r 50 r
52 } 51 }
53 52
54 fn read_sensor(&self, n: &str) -> Result<f32, Box<Error>> { 53 fn read_sensor(&self, n: &str) -> Result<f32, TemplogError> {
55 lazy_static! { 54 lazy_static! {
56 // multiline 55 // multiline
57 static ref THERM_RE: regex::Regex = regex::Regex::new("(?m).* YES\n.*t=(.*)\n").unwrap(); 56 static ref THERM_RE: regex::Regex = regex::Regex::new("(?m).* YES\n.*t=(.*)\n").unwrap();
58 } 57 }
59 let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR); 58 let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR);
69 })?.as_str(); 68 })?.as_str();
70 69
71 Ok(f32::from_str(v)?) 70 Ok(f32::from_str(v)?)
72 } 71 }
73 72
74 fn sensor_names(&self) -> Result<Vec<String>, Box<Error>> { 73 fn sensor_names(&self) -> Result<Vec<String>, TemplogError> {
75 // TODO: needs to handle multiple busses. 74 // TODO: needs to handle multiple busses.
76 let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR); 75 let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR);
77 path.push("w1_master_slaves"); 76 path.push("w1_master_slaves");
78 77
79 let f = BufReader::new(File::open(path)?); 78 let f = BufReader::new(File::open(path)?);
82 } 81 }
83 } 82 }
84 83
85 impl Sensor for OneWireSensor { 84 impl Sensor for OneWireSensor {
86 85
87 fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=io::Error>> { 86 fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
88 let pool = futures_cpupool::CpuPool::new(4); // TODO: how many? 87 let pool = futures_cpupool::CpuPool::new(4); // TODO: how many?
89 88
90 let dur = Duration::new(self.config.SENSOR_SLEEP,0); 89 let dur = Duration::new(self.config.SENSOR_SLEEP,0);
91 let s = Arc::new(self.clone()); 90 let s = Arc::new(self.clone());
92 Interval::new(dur, handle).unwrap() 91 let i = Interval::new(dur, handle).unwrap()
92 .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
93 .and_then(move |()| { 93 .and_then(move |()| {
94 let a = s.clone(); 94 let a = s.clone();
95 pool.spawn_fn(move || Ok(a.step())) 95 pool.spawn_fn(move || Ok(a.step()))
96 }) 96 });
97 .boxed() 97 consume_errors(i)
98 } 98 }
99 } 99 }
100 100
101 #[derive(Clone)] 101 #[derive(Clone)]
102 pub struct TestSensor { 102 pub struct TestSensor {
116 r.add("wort", Self::try_read("test_wort.txt").unwrap_or_else(|_| 18.0)); 116 r.add("wort", Self::try_read("test_wort.txt").unwrap_or_else(|_| 18.0));
117 r.add("fridge", Self::try_read("test_fridge.txt").unwrap_or_else(|_| 20.0)); 117 r.add("fridge", Self::try_read("test_fridge.txt").unwrap_or_else(|_| 20.0));
118 r 118 r
119 } 119 }
120 120
121 fn try_read(filename: &str) -> Result<f32, Box<Error>> { 121 fn try_read(filename: &str) -> Result<f32, TemplogError> {
122 let mut s = String::new(); 122 let mut s = String::new();
123 File::open(filename)?.read_to_string(&mut s)?; 123 File::open(filename)?.read_to_string(&mut s)?;
124 Ok(s.trim().parse::<f32>()?) 124 Ok(s.trim().parse::<f32>()?)
125 } 125 }
126 } 126 }
127 127
128 impl Sensor for TestSensor { 128 impl Sensor for TestSensor {
129 fn stream(&self, handle: &Handle) 129 fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
130 -> Box<Stream<Item=Readings, Error=io::Error>> {
131 130
132 let dur = Duration::new(self.config.SENSOR_SLEEP,0); 131 let dur = Duration::new(self.config.SENSOR_SLEEP,0);
133 Interval::new(dur, handle).unwrap() 132 let i = Interval::new(dur, handle).unwrap()
133 .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
134 .and_then(move |()| { 134 .and_then(move |()| {
135 Ok(Self::test_step()) 135 Ok(Self::test_step())
136 }).boxed() 136 });
137 consume_errors(i)
137 } 138 }
138 } 139 }
139 140