Mercurial > templog
diff rust/src/params.rs @ 634:a5721c02d3ee rust
build succeeds
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Sun, 22 Sep 2019 20:35:40 +0800 |
parents | 490e9e15b98c |
children | 4424a8b30f9c |
line wrap: on
line diff
--- a/rust/src/params.rs Wed Sep 04 23:24:13 2019 +0800 +++ b/rust/src/params.rs Sun Sep 22 20:35:40 2019 +0800 @@ -1,21 +1,26 @@ use std::time::Duration; -use std::io; + use std::str; -use std::rc::Rc; -use std::sync::{Arc,Mutex}; -use std::error::Error; -use std::cell::{Cell,RefCell}; + + + +use std::cell::{RefCell}; use std::fs::File; use std::io::Read; use serde::{Serialize,Deserialize}; -use rand::rngs::{StdRng, OsRng}; -use rand::{RngCore, SeedableRng}; +use rand::rngs::{OsRng}; +use rand::{RngCore}; + + +use hyper; -use std::str::FromStr; -use hyper; -use hyper::client::Client; +// for try_concat() +use futures::stream::TryStreamExt; +use futures::executor::block_on; +// for block_on().or_else +use futures_util::try_future::TryFutureExt; use riker::actors::*; @@ -41,7 +46,7 @@ fridge_difference: 0.2, overshoot_delay: 720, // 12 minutes overshoot_factor: 1.0, - disabled: false, + disabled: true, nowort: false, fridge_range_lower: 3.0, fridge_range_upper: 3.0, @@ -66,7 +71,7 @@ // last_etag is used for long-polling. last_etag: RefCell<String>, epoch: String, - chan: ChannelRef<Params>, + chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option? config: Config, } @@ -77,7 +82,7 @@ impl ParamWaiter { - fn new(config: Config) -> Self { + pub fn new(config: Config) -> Self { let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 OsRng.fill_bytes(&mut b); let epoch = base64::encode(&b); @@ -86,29 +91,26 @@ limitlog: NotTooOften::new(LOG_MINUTES*60), last_etag: RefCell::new(String::new()), epoch: epoch, + chan: None, config: config, } } - - async fn keep_waiting(&mut self) { - loop { - self.wait_updates().await; - } - } + async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { + let req = hyper::Request::get(uri) + .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) + .body(hyper::Body::from("")).unwrap(); - async fn wait_updates(&mut self) { + // TODO timeout? + let resp = hyper::Client::new().request(req).await?; + let status = resp.status(); + let chunk = resp.into_body().try_concat().await?; - let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); - let mut req = hyper::Request::new(hyper::Method::Get, uri); - req.headers_mut().insert(hyper::header::ETAG, self.last_etag.borrow()); - let resp = hyper::Client::new(&self.handle).request(req).await?; - let b = resp.body().concat2().await?; - let new_params = self.handle_response(b)?; - self.chan.tell(Publish{msg: new_params, topic: "params".into()}, None); + Ok((chunk, status)) } fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { + #[derive(Deserialize, Debug)] struct Response { // sent as an opaque etag: header. Has format "epoch-nonce", @@ -117,11 +119,10 @@ params: Params, } - let text = String::from_utf8_lossy(buf.as_ref()); match status { - hyper::StatusCode::Ok => { + hyper::StatusCode::OK => { // new params - let r: Response = serde_json::from_str(&text)?; + let r: Response = serde_json::from_slice(&buf)?; let mut le = self.last_etag.borrow_mut(); *le = r.etag; @@ -134,13 +135,13 @@ } Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) } - hyper::StatusCode::NotModified => { + hyper::StatusCode::NOT_MODIFIED => { // XXX this isn't really an error. Should handle_response() return // Result<Option<Params>, TemplogError> instead? - Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) }, _ => { + let text = String::from_utf8_lossy(buf.as_ref()); Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text))) }, } @@ -152,13 +153,41 @@ serde_json::to_writer(f, params) }); } + + fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { + let url = self.config.SETTINGS_URL.clone(); + let etag = self.last_etag.borrow().clone(); + let h = ctx.run(async move { + Self::wait_updates(&url, &etag).await + }).expect("spawn failed"); // XXX error handling + let (chunk, stat) = block_on(h)?; + let new_params = self.handle_response(chunk, stat)?; + self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None); + Ok(()) + } } +#[derive(Clone,Debug)] +pub struct PollForParams; + impl Actor for ParamWaiter { + type Msg = PollForParams; - fn post_start(&mut self, ctx: &Context<Self::Msg>) { - self.chan = channel("params", &ctx.system).unwrap(); - ctx.run(self.wait_updates()); + fn recv(&mut self, + ctx: &Context<Self::Msg>, + _msg: Self::Msg, + _sender: Sender) { + // schedule a retry once this iteration finishes + ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); + + if let Err(e) = self.do_poll(ctx) { + warn!("Problem fetching params: {}", e); + } + } + + fn pre_start(&mut self, ctx: &Context<Self::Msg>) { + self.chan = Some(channel("params", &ctx.system).unwrap()); + ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); } }