Mercurial > templog
diff rust/src/params.rs @ 633:490e9e15b98c rust
move some bits to riker
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 04 Sep 2019 23:24:13 +0800 |
parents | bde302def78e |
children | a5721c02d3ee |
line wrap: on
line diff
--- a/rust/src/params.rs Thu Aug 22 23:59:50 2019 +0800 +++ b/rust/src/params.rs Wed Sep 04 23:24:13 2019 +0800 @@ -8,15 +8,19 @@ use std::fs::File; use std::io::Read; +use serde::{Serialize,Deserialize}; -use rand::Rng; +use rand::rngs::{StdRng, OsRng}; +use rand::{RngCore, SeedableRng}; + use std::str::FromStr; use hyper; -use hyper::header::{Headers, ETag, EntityTag}; use hyper::client::Client; -use crate::types::*; -use ::Config; +use riker::actors::*; + +use super::types::*; +use super::config::Config; #[derive(Deserialize, Serialize, Debug, Clone)] pub struct Params { @@ -54,25 +58,17 @@ Self::try_load(&config.PARAMS_FILE) .unwrap_or_else(|_| Params::defaults()) } + } -#[derive(Deserialize, Debug)] -struct Response { - // sent as an opaque etag: header. Has format "epoch-nonce", - // responses where the epoch do not match ParamWaiter::epoch are dropped - etag: String, - params: Params, -} - -#[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, // last_etag is used for long-polling. last_etag: RefCell<String>, epoch: String, + chan: ChannelRef<Params>, config: Config, - handle: Handle, } const LOG_MINUTES: u64 = 15; @@ -80,20 +76,47 @@ const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { - fn make_request(&self) -> hyper::client::FutureResponse { - let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); - let mut req = hyper::client::Request::new(hyper::Method::Get, uri); - { - let headers = req.headers_mut(); - headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); + + fn new(config: Config) -> Self { + let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 + OsRng.fill_bytes(&mut b); + let epoch = base64::encode(&b); + + ParamWaiter { + limitlog: NotTooOften::new(LOG_MINUTES*60), + last_etag: RefCell::new(String::new()), + epoch: epoch, + config: config, } - hyper::client::Client::new(&self.handle).request(req) - // XXX how do we do this? - // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); } + async fn keep_waiting(&mut self) { + loop { + self.wait_updates().await; + } + } + + async fn wait_updates(&mut self) { + + let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); + let mut req = hyper::Request::new(hyper::Method::Get, uri); + req.headers_mut().insert(hyper::header::ETAG, self.last_etag.borrow()); + let resp = hyper::Client::new(&self.handle).request(req).await?; + let b = resp.body().concat2().await?; + let new_params = self.handle_response(b)?; + self.chan.tell(Publish{msg: new_params, topic: "params".into()}, None); + } + fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { + #[derive(Deserialize, Debug)] + struct Response { + // sent as an opaque etag: header. Has format "epoch-nonce", + // responses where the epoch do not match ParamWaiter::epoch are dropped + etag: String, + params: Params, + } + let text = String::from_utf8_lossy(buf.as_ref()); match status { hyper::StatusCode::Ok => { @@ -129,37 +152,30 @@ serde_json::to_writer(f, params) }); } +} - pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { - let rcself = Rc::new(ParamWaiter::new(config, handle)); +impl Actor for ParamWaiter { - let dur = Duration::from_millis(4000); - for _ in Interval::new(dur, &rcself.handle).unwrap() { - // fetch params - // TODO - skip if inflight. - let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?; - let status = r.status(); - let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?; - if let Ok(params) = rcself.handle_response(b, status) { - stream_yield!(params); - } - } - Ok(()) + fn post_start(&mut self, ctx: &Context<Self::Msg>) { + self.chan = channel("params", &ctx.system).unwrap(); + ctx.run(self.wait_updates()); } - - fn new(config: Config, handle: Handle) -> Self { - let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 - rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); - let epoch = base64::encode(&b); - - ParamWaiter { - limitlog: NotTooOften::new(LOG_MINUTES*60), - last_etag: RefCell::new(String::new()), - epoch: epoch, - config: config, - handle: handle, - } - } - } + // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { + // let rcself = Rc::new(ParamWaiter::new(config, handle)); + + // let dur = Duration::from_millis(4000); + // for _ in Interval::new(dur, &rcself.handle).unwrap() { + // // fetch params + // // TODO - skip if inflight. + // let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?; + // let status = r.status(); + // let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?; + // if let Ok(params) = rcself.handle_response(b, status) { + // stream_yield!(params); + // } + // } + // Ok(()) + // } +