# HG changeset patch # User Matt Johnston # Date 1573270559 -28800 # Node ID a9f353f488d0214d4f432213edc33975adf3ccc2 # Parent 1e147b3c2c558013dc4a7f1a8ee8c1702d9655b6 fix channels diff -r 1e147b3c2c55 -r a9f353f488d0 rust/src/fridge.rs --- a/rust/src/fridge.rs Sat Nov 09 11:35:43 2019 +0800 +++ b/rust/src/fridge.rs Sat Nov 09 11:35:59 2019 +0800 @@ -11,8 +11,10 @@ #[cfg(target_os = "linux")] use self::sysfs_gpio::{Direction, Pin}; +use crate::params::Params; use super::config::Config; -use super::params::Params; +use super::params; +use super::sensor; use super::types::*; #[derive(Debug,Clone)] @@ -22,6 +24,7 @@ pub struct Fridge { params: Params, config: Config, + testmode: bool, on: bool, temp_wort: Option, @@ -43,13 +46,25 @@ } fn pre_start(&mut self, ctx: &Context) { - let chan: ChannelRef = channel("readings", &ctx.system).unwrap(); + + let params_chan : ChannelRef = channel("params", ctx).unwrap(); + let sensor_chan : ChannelRef = channel("readings", ctx).unwrap(); let sub = Box::new(ctx.myself()); - chan.tell(Subscribe {actor: sub, topic: "readings".into()}, None); + params_chan.tell(Subscribe {actor: sub.clone(), topic: "params".into()}, None); + sensor_chan.tell(Subscribe {actor: sub.clone(), topic: "readings".into()}, None); + - let chan: ChannelRef = channel("params", &ctx.system).unwrap(); - let sub = Box::new(ctx.myself()); - chan.tell(Subscribe {actor: sub, topic: "params".into()}, None); + // XXX a better way to get own reference? + let props = Props::new_args(params::ParamWaiter::new_actor, (self.config.clone(), params_chan)); + ctx.actor_of(props, "paramwaiter").unwrap(); + + if self.testmode { + let props = Props::new_args(sensor::TestSensor::new_actor, (self.config.clone(), sensor_chan)); + ctx.actor_of(props, "sensor").unwrap() + } else { + let props = Props::new_args(sensor::OneWireSensor::new_actor, (self.config.clone(), sensor_chan)); + ctx.actor_of(props, "sensor").unwrap() + }; self.tick(ctx); } @@ -109,11 +124,14 @@ } impl Fridge { - pub fn new_actor((config, nowait) : (Config, bool)) -> Fridge { - Self::new(config, nowait) + pub fn new_actor((config, testmode, nowait) + : (Config, bool, bool)) -> Fridge { + Self::new(config, testmode, nowait) } - pub fn new(config: Config, nowait: bool) -> Fridge { + pub fn new(config: Config, + testmode: bool, + nowait: bool) -> Fridge { let mut f = Fridge { config: config.clone(), params: Params::defaults(), @@ -124,6 +142,7 @@ wort_valid_time: Instant::now() - Duration::new(config.fridge_wort_invalid_time, 100), integrator: StepIntegrator::new(Duration::new(1, 0)), control: Self::make_control(&config), + testmode: testmode, }; if nowait { diff -r 1e147b3c2c55 -r a9f353f488d0 rust/src/main.rs --- a/rust/src/main.rs Sat Nov 09 11:35:43 2019 +0800 +++ b/rust/src/main.rs Sat Nov 09 11:35:59 2019 +0800 @@ -1,4 +1,3 @@ -#![feature(async_closure)] #[macro_use] extern crate log; // riker has its own logging? //extern crate env_logger; @@ -12,6 +11,7 @@ mod types; mod params; +use std::time::Duration; use riker::actors::*; @@ -24,20 +24,14 @@ let cf = config::Config::load(conf_file)?; let sys = ActorSystem::new().unwrap(); - let props = Props::new_args(params::ParamWaiter::new, cf.clone()); - sys.actor_of(props, "paramwaiter").unwrap(); + + let props = Props::new_args(fridge::Fridge::new_actor, (cf.clone(), testmode, nowait)); + sys.actor_of(props, "fridge").unwrap(); - if testmode { - let props = Props::new_args(sensor::TestSensor::new, cf.clone()); - sys.actor_of(props, "sensor").unwrap() - } else { - let props = Props::new_args(sensor::OneWireSensor::new, cf.clone()); - sys.actor_of(props, "sensor").unwrap() - }; - - let props = Props::new_args(fridge::Fridge::new_actor, (cf.clone(), nowait)); - sys.actor_of(props, "fridge").unwrap(); - Ok(()) + loop { + // TODO: wait for a semaphore or something? + std::thread::sleep(Duration::from_millis(60000)); + } } #[derive(Debug, StructOpt)] diff -r 1e147b3c2c55 -r a9f353f488d0 rust/src/params.rs --- a/rust/src/params.rs Sat Nov 09 11:35:43 2019 +0800 +++ b/rust/src/params.rs Sat Nov 09 11:35:59 2019 +0800 @@ -69,7 +69,7 @@ // last_etag is used for long-polling. last_etag: RefCell, epoch: String, - chan: Option>, // TODO: a way to avoid Option? + notify: ChannelRef, config: Config, } @@ -79,8 +79,11 @@ const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { + pub fn new_actor((config, notify): (Config, ChannelRef)) -> Self { + Self::new(config, notify) + } - pub fn new(config: Config) -> Self { + pub fn new(config: Config, notify: ChannelRef) -> Self { let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 OsRng.fill_bytes(&mut b); let epoch = base64::encode(&b); @@ -89,12 +92,13 @@ limitlog: NotTooOften::new(LOG_MINUTES*60), last_etag: RefCell::new(String::new()), epoch: epoch, - chan: None, config: config, + notify: notify, } } async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { + debug!("wait_updates {}", uri); let req = hyper::Request::get(uri) .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) .body(hyper::Body::from("")).unwrap(); @@ -147,9 +151,13 @@ fn write_params(&self, params: &Params) { let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite); - p.write(|f| { + let r = p.write(|f| { serde_json::to_writer(f, params) }); + if let Err(e) = r { + // XXX notify? + error!("Couldn't write to {}: {}", self.config.params_file, e) + }; } fn do_poll(&mut self, ctx: &Context<::Msg>) -> Result<(), TemplogError> { @@ -160,7 +168,7 @@ }).expect("spawn failed"); // XXX error handling let (chunk, stat) = block_on(h)?; let new_params = self.handle_response(chunk, stat)?; - self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None); + self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None); Ok(()) } } @@ -186,7 +194,6 @@ } fn pre_start(&mut self, ctx: &Context) { - self.chan = Some(channel("params", &ctx.system).unwrap()); ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); } } diff -r 1e147b3c2c55 -r a9f353f488d0 rust/src/sensor.rs --- a/rust/src/sensor.rs Sat Nov 09 11:35:43 2019 +0800 +++ b/rust/src/sensor.rs Sat Nov 09 11:35:59 2019 +0800 @@ -13,13 +13,13 @@ pub struct OneWireSensor { config: Config, - chan: Option>, + notify: ChannelRef, } // #[derive(Clone)] pub struct TestSensor { config: Config, - chan: Option>, + notify: ChannelRef, } #[derive(Debug,Clone)] @@ -32,21 +32,24 @@ _ctx: &Context, _msg: Self::Msg, _sender: Sender) { - self.chan.as_ref().unwrap().tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None); + self.notify.tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None); } fn pre_start(&mut self, ctx: &Context) { - self.chan = Some(channel("readings", &ctx.system).unwrap()); let dur = Duration::new(self.config.sensor_sleep,0); ctx.schedule(Duration::from_millis(0), dur, ctx.myself(), None, SendReading); } } impl OneWireSensor { - pub fn new(config: Config) -> Self { + pub fn new_actor((config, notify): (Config, ChannelRef)) -> Self { + Self::new(config, notify) + } + + pub fn new(config: Config, notify: ChannelRef) -> Self { OneWireSensor { config: config.clone(), - chan: None, + notify: notify, } } @@ -104,21 +107,25 @@ _ctx: &Context, _msg: Self::Msg, _sender: Sender) { - self.chan.as_ref().unwrap().tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None); + self.notify.tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None); } fn pre_start(&mut self, ctx: &Context) { - self.chan = Some(channel("readings", &ctx.system).unwrap()); + info!("pre_start testsensor readings"); let dur = Duration::new(self.config.sensor_sleep,0); ctx.schedule(Duration::from_millis(0), dur, ctx.myself(), None, SendReading); } } impl TestSensor { - pub fn new(config: Config) -> Self { + pub fn new_actor((config, notify): (Config, ChannelRef)) -> Self { + Self::new(config, notify) + } + + pub fn new(config: Config, notify: ChannelRef) -> Self { TestSensor { config: config.clone(), - chan: None, + notify: notify, } }