Mercurial > templog
diff rust/src/params.rs @ 638:a9f353f488d0 rust
fix channels
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Sat, 09 Nov 2019 11:35:59 +0800 |
parents | 43eb3cfdf769 |
children | 89818a14648b |
line wrap: on
line diff
--- a/rust/src/params.rs Sat Nov 09 11:35:43 2019 +0800 +++ b/rust/src/params.rs Sat Nov 09 11:35:59 2019 +0800 @@ -69,7 +69,7 @@ // last_etag is used for long-polling. last_etag: RefCell<String>, epoch: String, - chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option? + notify: ChannelRef<Params>, config: Config, } @@ -79,8 +79,11 @@ const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { + pub fn new_actor((config, notify): (Config, ChannelRef<Params>)) -> Self { + Self::new(config, notify) + } - pub fn new(config: Config) -> Self { + pub fn new(config: Config, notify: ChannelRef<Params>) -> Self { let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 OsRng.fill_bytes(&mut b); let epoch = base64::encode(&b); @@ -89,12 +92,13 @@ limitlog: NotTooOften::new(LOG_MINUTES*60), last_etag: RefCell::new(String::new()), epoch: epoch, - chan: None, config: config, + notify: notify, } } async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { + debug!("wait_updates {}", uri); let req = hyper::Request::get(uri) .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) .body(hyper::Body::from("")).unwrap(); @@ -147,9 +151,13 @@ fn write_params(&self, params: &Params) { let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite); - p.write(|f| { + let r = p.write(|f| { serde_json::to_writer(f, params) }); + if let Err(e) = r { + // XXX notify? + error!("Couldn't write to {}: {}", self.config.params_file, e) + }; } fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { @@ -160,7 +168,7 @@ }).expect("spawn failed"); // XXX error handling let (chunk, stat) = block_on(h)?; let new_params = self.handle_response(chunk, stat)?; - self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None); + self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None); Ok(()) } } @@ -186,7 +194,6 @@ } fn pre_start(&mut self, ctx: &Context<Self::Msg>) { - self.chan = Some(channel("params", &ctx.system).unwrap()); ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); } }