diff rust/src/params.rs @ 638:a9f353f488d0 rust

fix channels
author Matt Johnston <matt@ucc.asn.au>
date Sat, 09 Nov 2019 11:35:59 +0800
parents 43eb3cfdf769
children 89818a14648b
line wrap: on
line diff
--- a/rust/src/params.rs	Sat Nov 09 11:35:43 2019 +0800
+++ b/rust/src/params.rs	Sat Nov 09 11:35:59 2019 +0800
@@ -69,7 +69,7 @@
     // last_etag is used for long-polling.
     last_etag: RefCell<String>,
     epoch: String,
-    chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option?
+    notify: ChannelRef<Params>,
 
     config: Config,
 }
@@ -79,8 +79,11 @@
 const TIMEOUT_MINUTES: u64 = 5;
 
 impl ParamWaiter {
+    pub fn new_actor((config, notify): (Config, ChannelRef<Params>)) -> Self {
+        Self::new(config, notify)
+    }
 
-    pub fn new(config: Config) -> Self {
+    pub fn new(config: Config, notify: ChannelRef<Params>) -> Self {
         let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
         OsRng.fill_bytes(&mut b);
         let epoch = base64::encode(&b);
@@ -89,12 +92,13 @@
             limitlog: NotTooOften::new(LOG_MINUTES*60),
             last_etag: RefCell::new(String::new()),
             epoch: epoch,
-            chan: None,
             config: config,
+            notify: notify,
         }
     }
 
     async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> {
+        debug!("wait_updates {}", uri);
         let req = hyper::Request::get(uri)
             .header(hyper::header::ETAG, etag)//*self.last_etag.borrow())
             .body(hyper::Body::from("")).unwrap();
@@ -147,9 +151,13 @@
 
     fn write_params(&self, params: &Params) {
         let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite);
-        p.write(|f| {
+        let r = p.write(|f| {
             serde_json::to_writer(f, params)
         });
+        if let Err(e) = r {
+            // XXX notify?
+            error!("Couldn't write to {}: {}", self.config.params_file, e)
+        };
     }
 
     fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> {
@@ -160,7 +168,7 @@
         }).expect("spawn failed"); // XXX error handling
         let (chunk, stat) = block_on(h)?;
         let new_params = self.handle_response(chunk, stat)?;
-        self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None);
+        self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None);
         Ok(())
     }
 }
@@ -186,7 +194,6 @@
     }
 
     fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
-        self.chan = Some(channel("params", &ctx.system).unwrap());
         ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams);
     }
 }