changeset 638:a9f353f488d0 rust

fix channels
author Matt Johnston <matt@ucc.asn.au>
date Sat, 09 Nov 2019 11:35:59 +0800
parents 1e147b3c2c55
children 89818a14648b
files rust/src/fridge.rs rust/src/main.rs rust/src/params.rs rust/src/sensor.rs
diffstat 4 files changed, 66 insertions(+), 39 deletions(-) [+]
line wrap: on
line diff
--- a/rust/src/fridge.rs	Sat Nov 09 11:35:43 2019 +0800
+++ b/rust/src/fridge.rs	Sat Nov 09 11:35:59 2019 +0800
@@ -11,8 +11,10 @@
 #[cfg(target_os = "linux")]
 use self::sysfs_gpio::{Direction, Pin};
 
+use crate::params::Params;
 use super::config::Config;
-use super::params::Params;
+use super::params;
+use super::sensor;
 use super::types::*;
 
 #[derive(Debug,Clone)]
@@ -22,6 +24,7 @@
 pub struct Fridge {
     params: Params,
     config: Config,
+    testmode: bool,
 
     on: bool,
     temp_wort: Option<f32>,
@@ -43,13 +46,25 @@
     }
 
     fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
-        let chan: ChannelRef<Readings> = channel("readings", &ctx.system).unwrap();
+
+        let params_chan : ChannelRef<Params> = channel("params", ctx).unwrap();
+        let sensor_chan : ChannelRef<Readings> = channel("readings", ctx).unwrap();
         let sub = Box::new(ctx.myself());
-        chan.tell(Subscribe {actor: sub, topic: "readings".into()}, None);
+        params_chan.tell(Subscribe {actor: sub.clone(), topic: "params".into()}, None);
+        sensor_chan.tell(Subscribe {actor: sub.clone(), topic: "readings".into()}, None);
+
 
-        let chan: ChannelRef<Params> = channel("params", &ctx.system).unwrap();
-        let sub = Box::new(ctx.myself());
-        chan.tell(Subscribe {actor: sub, topic: "params".into()}, None);
+        // XXX a better way to get own reference?
+        let props = Props::new_args(params::ParamWaiter::new_actor, (self.config.clone(), params_chan));
+        ctx.actor_of(props, "paramwaiter").unwrap();
+
+        if self.testmode {
+            let props = Props::new_args(sensor::TestSensor::new_actor, (self.config.clone(), sensor_chan));
+            ctx.actor_of(props, "sensor").unwrap()
+        } else {
+            let props = Props::new_args(sensor::OneWireSensor::new_actor, (self.config.clone(), sensor_chan));
+            ctx.actor_of(props, "sensor").unwrap()
+        };
 
         self.tick(ctx);
     }
@@ -109,11 +124,14 @@
 }
 
 impl Fridge {
-    pub fn new_actor((config, nowait) : (Config, bool)) -> Fridge {
-        Self::new(config, nowait)
+    pub fn new_actor((config, testmode, nowait) 
+        : (Config, bool, bool)) -> Fridge {
+        Self::new(config, testmode, nowait)
 
     }
-    pub fn new(config: Config, nowait: bool) -> Fridge {
+    pub fn new(config: Config, 
+            testmode: bool,
+            nowait: bool) -> Fridge {
         let mut f = Fridge { 
             config: config.clone(),
             params: Params::defaults(),
@@ -124,6 +142,7 @@
             wort_valid_time: Instant::now() - Duration::new(config.fridge_wort_invalid_time, 100),
             integrator: StepIntegrator::new(Duration::new(1, 0)),
             control: Self::make_control(&config),
+            testmode: testmode,
         };
 
         if nowait {
--- a/rust/src/main.rs	Sat Nov 09 11:35:43 2019 +0800
+++ b/rust/src/main.rs	Sat Nov 09 11:35:59 2019 +0800
@@ -1,4 +1,3 @@
-#![feature(async_closure)]
 #[macro_use] extern crate log;
 // riker has its own logging?
 //extern crate env_logger;
@@ -12,6 +11,7 @@
 mod types;
 mod params;
 
+use std::time::Duration;
 
 use riker::actors::*;
 
@@ -24,20 +24,14 @@
     let cf = config::Config::load(conf_file)?;
 
     let sys = ActorSystem::new().unwrap();
-    let props = Props::new_args(params::ParamWaiter::new, cf.clone());
-    sys.actor_of(props, "paramwaiter").unwrap();
+
+    let props = Props::new_args(fridge::Fridge::new_actor, (cf.clone(), testmode, nowait));
+    sys.actor_of(props, "fridge").unwrap();
 
-    if testmode {
-        let props = Props::new_args(sensor::TestSensor::new, cf.clone());
-        sys.actor_of(props, "sensor").unwrap()
-    } else {
-        let props = Props::new_args(sensor::OneWireSensor::new, cf.clone());
-        sys.actor_of(props, "sensor").unwrap()
-    };
-
-    let props = Props::new_args(fridge::Fridge::new_actor, (cf.clone(), nowait));
-    sys.actor_of(props, "fridge").unwrap();
-    Ok(())
+    loop {
+        // TODO: wait for a semaphore or something?
+        std::thread::sleep(Duration::from_millis(60000));
+   }
 }
 
 #[derive(Debug, StructOpt)]
--- a/rust/src/params.rs	Sat Nov 09 11:35:43 2019 +0800
+++ b/rust/src/params.rs	Sat Nov 09 11:35:59 2019 +0800
@@ -69,7 +69,7 @@
     // last_etag is used for long-polling.
     last_etag: RefCell<String>,
     epoch: String,
-    chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option?
+    notify: ChannelRef<Params>,
 
     config: Config,
 }
@@ -79,8 +79,11 @@
 const TIMEOUT_MINUTES: u64 = 5;
 
 impl ParamWaiter {
+    pub fn new_actor((config, notify): (Config, ChannelRef<Params>)) -> Self {
+        Self::new(config, notify)
+    }
 
-    pub fn new(config: Config) -> Self {
+    pub fn new(config: Config, notify: ChannelRef<Params>) -> Self {
         let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
         OsRng.fill_bytes(&mut b);
         let epoch = base64::encode(&b);
@@ -89,12 +92,13 @@
             limitlog: NotTooOften::new(LOG_MINUTES*60),
             last_etag: RefCell::new(String::new()),
             epoch: epoch,
-            chan: None,
             config: config,
+            notify: notify,
         }
     }
 
     async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> {
+        debug!("wait_updates {}", uri);
         let req = hyper::Request::get(uri)
             .header(hyper::header::ETAG, etag)//*self.last_etag.borrow())
             .body(hyper::Body::from("")).unwrap();
@@ -147,9 +151,13 @@
 
     fn write_params(&self, params: &Params) {
         let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite);
-        p.write(|f| {
+        let r = p.write(|f| {
             serde_json::to_writer(f, params)
         });
+        if let Err(e) = r {
+            // XXX notify?
+            error!("Couldn't write to {}: {}", self.config.params_file, e)
+        };
     }
 
     fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> {
@@ -160,7 +168,7 @@
         }).expect("spawn failed"); // XXX error handling
         let (chunk, stat) = block_on(h)?;
         let new_params = self.handle_response(chunk, stat)?;
-        self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None);
+        self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None);
         Ok(())
     }
 }
@@ -186,7 +194,6 @@
     }
 
     fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
-        self.chan = Some(channel("params", &ctx.system).unwrap());
         ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams);
     }
 }
--- a/rust/src/sensor.rs	Sat Nov 09 11:35:43 2019 +0800
+++ b/rust/src/sensor.rs	Sat Nov 09 11:35:59 2019 +0800
@@ -13,13 +13,13 @@
 
 pub struct OneWireSensor {
     config: Config,
-    chan: Option<ChannelRef<Readings>>,
+    notify: ChannelRef<Readings>,
 }
 
 // #[derive(Clone)]
 pub struct TestSensor {
     config: Config,
-    chan: Option<ChannelRef<Readings>>,
+    notify: ChannelRef<Readings>,
 }
 
 #[derive(Debug,Clone)]
@@ -32,21 +32,24 @@
             _ctx: &Context<Self::Msg>,
             _msg: Self::Msg,
             _sender: Sender) {
-        self.chan.as_ref().unwrap().tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None);
+        self.notify.tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None);
     }
 
     fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
-        self.chan = Some(channel("readings", &ctx.system).unwrap());
         let dur = Duration::new(self.config.sensor_sleep,0);
         ctx.schedule(Duration::from_millis(0), dur, ctx.myself(), None, SendReading);
     }
 }
 
 impl OneWireSensor {
-    pub fn new(config: Config) -> Self {
+    pub fn new_actor((config, notify): (Config, ChannelRef<Readings>)) -> Self {
+        Self::new(config, notify)
+    }
+
+    pub fn new(config: Config, notify: ChannelRef<Readings>) -> Self {
         OneWireSensor {
             config: config.clone(),
-            chan: None,
+            notify: notify,
         }
     }
 
@@ -104,21 +107,25 @@
             _ctx: &Context<Self::Msg>,
             _msg: Self::Msg,
             _sender: Sender) {
-        self.chan.as_ref().unwrap().tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None);
+        self.notify.tell(Publish{msg: self.get_readings(), topic: "readings".into()}, None);
     }
 
     fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
-        self.chan = Some(channel("readings", &ctx.system).unwrap());
+        info!("pre_start testsensor readings");
         let dur = Duration::new(self.config.sensor_sleep,0);
         ctx.schedule(Duration::from_millis(0), dur, ctx.myself(), None, SendReading);
     }
 }
 
 impl TestSensor {
-    pub fn new(config: Config) -> Self {
+    pub fn new_actor((config, notify): (Config, ChannelRef<Readings>)) -> Self {
+        Self::new(config, notify)
+    }
+
+    pub fn new(config: Config, notify: ChannelRef<Readings>) -> Self {
         TestSensor {
             config: config.clone(),
-            chan: None,
+            notify: notify,
         }
     }