Mercurial > templog
diff rust/src/sensor.rs @ 633:490e9e15b98c rust
move some bits to riker
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Sep 2019 23:24:13 +0800 |
parents | bde302def78e |
children | a5721c02d3ee |
line wrap: on
line diff
--- a/rust/src/sensor.rs Thu Aug 22 23:59:50 2019 +0800 +++ b/rust/src/sensor.rs Wed Sep 04 23:24:13 2019 +0800 @@ -13,21 +13,39 @@ pub struct OneWireSensor { config: Config, - fridge: Recipient<Readings>, + chan: ChannelRef<Readings>, +} + +struct SendReading; + +trait Sensor { } -impl Actor for OneWireSensor { - type Context = Context<Self>; +impl Actor for dyn Sensor { + type Msg = SendReading; + + fn recv(&mut self, + ctx: &Context<Self::Msg>, + msg: Self::Msg, + sender: Sender) { + self.chan.tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None); + } + + fn post_start(&mut self, ctx: &Context<Self::Msg>) { + self.chan = channel("readings", &ctx.system).unwrap(); + let dur = Duration::new(self.config.SENSOR_SLEEP,0); + ctx.schedule(Duration::from_millis(0), dur, self, None, SendReading); + } } impl OneWireSensor { - pub fn new(config: &Config, fridge: Recipient<Readings>) -> Self { + pub fn new(config: &Config) -> Self { OneWireSensor { config: config.clone(), } } - fn step(&self) -> Readings { + fn get_readings(&self) -> Readings { let mut r = Readings::new(); if let Ok(names) = self.sensor_names() { @@ -75,20 +93,6 @@ } impl Sensor for OneWireSensor { - - fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> { - let pool = futures_cpupool::CpuPool::new(4); // TODO: how many? - - let dur = Duration::new(self.config.SENSOR_SLEEP,0); - let s = Arc::new(self.clone()); - let i = Interval::new(dur, handle).unwrap() - .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e)) - .and_then(move |()| { - let a = s.clone(); - pool.spawn_fn(move || Ok(a.step())) - }); - consume_errors(i) - } } #[derive(Clone)] @@ -119,15 +123,5 @@ } impl Sensor for TestSensor { - fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> { - - let dur = Duration::new(self.config.SENSOR_SLEEP,0); - let i = Interval::new(dur, handle).unwrap() - .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e)) - .and_then(move |()| { - Ok(Self::test_step()) - }); - consume_errors(i) - } }