comparison rust/src/sensor.rs @ 633:490e9e15b98c rust

move some bits to riker
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Sep 2019 23:24:13 +0800
parents bde302def78e
children a5721c02d3ee
comparison
equal deleted inserted replaced
632:bde302def78e 633:490e9e15b98c
11 use super::types::*; 11 use super::types::*;
12 use super::config::Config; 12 use super::config::Config;
13 13
14 pub struct OneWireSensor { 14 pub struct OneWireSensor {
15 config: Config, 15 config: Config,
16 fridge: Recipient<Readings>, 16 chan: ChannelRef<Readings>,
17 } 17 }
18 18
19 impl Actor for OneWireSensor { 19 struct SendReading;
20 type Context = Context<Self>; 20
21 trait Sensor {
22 }
23
24 impl Actor for dyn Sensor {
25 type Msg = SendReading;
26
27 fn recv(&mut self,
28 ctx: &Context<Self::Msg>,
29 msg: Self::Msg,
30 sender: Sender) {
31 self.chan.tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None);
32 }
33
34 fn post_start(&mut self, ctx: &Context<Self::Msg>) {
35 self.chan = channel("readings", &ctx.system).unwrap();
36 let dur = Duration::new(self.config.SENSOR_SLEEP,0);
37 ctx.schedule(Duration::from_millis(0), dur, self, None, SendReading);
38 }
21 } 39 }
22 40
23 impl OneWireSensor { 41 impl OneWireSensor {
24 pub fn new(config: &Config, fridge: Recipient<Readings>) -> Self { 42 pub fn new(config: &Config) -> Self {
25 OneWireSensor { 43 OneWireSensor {
26 config: config.clone(), 44 config: config.clone(),
27 } 45 }
28 } 46 }
29 47
30 fn step(&self) -> Readings { 48 fn get_readings(&self) -> Readings {
31 let mut r = Readings::new(); 49 let mut r = Readings::new();
32 50
33 if let Ok(names) = self.sensor_names() { 51 if let Ok(names) = self.sensor_names() {
34 for n in &names { 52 for n in &names {
35 match self.read_sensor(n) { 53 match self.read_sensor(n) {
73 Ok(s) 91 Ok(s)
74 } 92 }
75 } 93 }
76 94
77 impl Sensor for OneWireSensor { 95 impl Sensor for OneWireSensor {
78
79 fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
80 let pool = futures_cpupool::CpuPool::new(4); // TODO: how many?
81
82 let dur = Duration::new(self.config.SENSOR_SLEEP,0);
83 let s = Arc::new(self.clone());
84 let i = Interval::new(dur, handle).unwrap()
85 .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
86 .and_then(move |()| {
87 let a = s.clone();
88 pool.spawn_fn(move || Ok(a.step()))
89 });
90 consume_errors(i)
91 }
92 } 96 }
93 97
94 #[derive(Clone)] 98 #[derive(Clone)]
95 pub struct TestSensor { 99 pub struct TestSensor {
96 config: Config, 100 config: Config,
117 Ok(s.trim().parse::<f32>()?) 121 Ok(s.trim().parse::<f32>()?)
118 } 122 }
119 } 123 }
120 124
121 impl Sensor for TestSensor { 125 impl Sensor for TestSensor {
122 fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
123
124 let dur = Duration::new(self.config.SENSOR_SLEEP,0);
125 let i = Interval::new(dur, handle).unwrap()
126 .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
127 .and_then(move |()| {
128 Ok(Self::test_step())
129 });
130 consume_errors(i)
131 }
132 } 126 }
133 127