# HG changeset patch # User Matt Johnston # Date 1488985694 -28800 # Node ID a85c0c9bc1fa48974edb5c16b6be91fd188c0257 # Parent f153aec221be06cba0bebf18211f255224f5f5ec hide epoch in ParamWaiter make_request handles the buffer too diff -r f153aec221be -r a85c0c9bc1fa rust/Cargo.lock --- a/rust/Cargo.lock Tue Mar 07 23:56:12 2017 +0800 +++ b/rust/Cargo.lock Wed Mar 08 23:08:14 2017 +0800 @@ -2,6 +2,7 @@ name = "wort-templog" version = "0.1.0" dependencies = [ + "base64 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "curl 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "docopt 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -38,6 +39,19 @@ ] [[package]] +name = "base64" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "byteorder" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "cfg-if" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -519,6 +533,8 @@ [metadata] "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66" "checksum aho-corasick 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0638fd549427caa90c499814196d1b9e3725eb4d15d7339d6de073a680ed0ca2" +"checksum base64 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065a0ce220ab84d0b6d5ae3e7bb77232209519c366f51f946fe28c19e84989d0" +"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8" "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c" "checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum curl 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c90e1240ef340dd4027ade439e5c7c2064dd9dc652682117bd50d1486a3add7b" diff -r f153aec221be -r a85c0c9bc1fa rust/Cargo.toml --- a/rust/Cargo.toml Tue Mar 07 23:56:12 2017 +0800 +++ b/rust/Cargo.toml Wed Mar 08 23:08:14 2017 +0800 @@ -21,4 +21,4 @@ toml = "0.3" curl = "0.4" serde_json = "0.9" - +base64 = "0.4.0" diff -r f153aec221be -r a85c0c9bc1fa rust/src/main.rs --- a/rust/src/main.rs Tue Mar 07 23:56:12 2017 +0800 +++ b/rust/src/main.rs Wed Mar 08 23:08:14 2017 +0800 @@ -42,7 +42,6 @@ let handle = core.handle(); let params = params::Params::load(&config); - let epoch = params.epoch.clone(); let mut fridge = fridge::Fridge::new(&config, nowait, params, &handle); let (fridge_reading_s, fridge_reading_r) = mpsc::channel(1); @@ -72,7 +71,7 @@ r }); - let param_stream = params::ParamWaiter::stream(config, epoch, &handle); + let param_stream = params::ParamWaiter::stream(config, &handle); let p = param_stream.map(|p| { fridge::Message::Params(p) }); diff -r f153aec221be -r a85c0c9bc1fa rust/src/params.rs --- a/rust/src/params.rs Tue Mar 07 23:56:12 2017 +0800 +++ b/rust/src/params.rs Wed Mar 08 23:08:14 2017 +0800 @@ -2,6 +2,7 @@ extern crate futures; extern crate rand; extern crate serde_json; +extern crate base64; use std::time::Duration; use std::io; @@ -17,6 +18,7 @@ use futures::{Stream,Future,future}; use curl::easy::Easy; use curl::easy; +use self::rand::Rng; use types::*; use ::Config; @@ -31,9 +33,6 @@ pub nowort: bool, pub fridge_range_lower: f32, pub fridge_range_upper: f32, - - #[serde(skip_serializing)] - pub epoch: String, } impl Params { @@ -47,28 +46,33 @@ nowort: false, fridge_range_lower: 3.0, fridge_range_upper: 3.0, - epoch: String::new(), } } pub fn load(config: &Config) -> Params { // generate random epoch on success. // TODO: return failure? or just return default() ? + let mut p = Params::defaults(); + unimplemented!(); } } #[derive(Deserialize, Debug)] struct Response { - epoch_tag: String, + // sent as an opaque etag: header. Has format "epoch-nonce", + // responses where the epoch do not match ParamWaiter::epoch are dropped + etag: String, params: Params, } #[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, - epoch_tag: RefCell, + // last_etag is used for long-polling. + last_etag: RefCell, session: RefCell>, + epoch: String, config: Config, handle: Handle, @@ -79,17 +83,31 @@ const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { - fn make_request(&self) -> Easy { + fn make_request(&self) -> (Easy, Arc>>) { let mut req = Easy::new(); req.get(true).unwrap(); - // supposedly req.url won't fail, checking is later? + // supposedly req.url won't fail, checking happens later? req.url(&self.config.SETTINGS_URL).unwrap(); req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); + // store response + let max_response_size = 10000; + let buf = Arc::new(Mutex::new(Vec::new())); + let dst = buf.clone(); + req.write_function(move |data| { + let mut dst = dst.lock().unwrap(); + dst.extend_from_slice(data); + if dst.len() > max_response_size { + error!("Too large params response from server: {}", dst.len()); + Ok(0) + } else { + Ok(data.len()) + } + }).unwrap(); + // http header - // etag: epoch-tag - let e = self.epoch_tag.borrow(); + let e = self.last_etag.borrow(); if !e.is_empty() { let mut list = easy::List::new(); let hd = format!("etag: {}", *e); @@ -97,7 +115,7 @@ req.http_headers(list).unwrap(); } - req + (req, buf) } fn handle_response(&self, req: &mut Easy, buf: &Vec) -> Result, TemplogError> { @@ -107,7 +125,7 @@ 200 => { // new params let r: Response = serde_json::from_str(&text)?; - *self.epoch_tag.borrow_mut() = r.epoch_tag; + *self.last_etag.borrow_mut() = r.etag; Ok(Some(r.params)) } 304 => Ok(None), // unmodified (long polling timeout at the server) @@ -122,23 +140,10 @@ if rself.session.borrow().is_none() { *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); } - let ses = rself.session.borrow().clone().unwrap(); - - let mut req = rself.make_request(); - let buf = Arc::new(Mutex::new(Vec::new())); - let dst = buf.clone(); - req.write_function(move |data| { - let mut dst = dst.lock().unwrap(); - dst.extend_from_slice(data); - if dst.len() > MAX_RESPONSE_SIZE { - debug!("Too large params response from server: {}", dst.len()); - Ok(0) - } else { - Ok(data.len()) - } - }).unwrap(); + let (mut req, buf) = rself.make_request(); + let ses = rself.session.borrow().clone().unwrap(); let s = ses.perform(req) .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) .and_then(move |mut rq| { @@ -154,18 +159,22 @@ */ } - fn new(config: &Config, epoch: String, handle: &Handle) -> Self { + fn new(config: &Config, handle: &Handle) -> Self { + let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 + rand::OsRng::new().unwrap().fill_bytes(&mut b); + let epoch = base64::encode(&b); ParamWaiter { limitlog: NotTooOften::new(LOG_MINUTES*60), - epoch_tag: RefCell::new(epoch), + last_etag: RefCell::new(String::new()), session: RefCell::new(None), + epoch: epoch, config: config.clone(), handle: handle.clone(), } } - pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box> { - let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); + pub fn stream(config: &Config, handle: &Handle) -> Box> { + let rcself = Rc::new(ParamWaiter::new(config, handle)); let dur = Duration::from_millis(4000); let i = Interval::new(dur, &rcself.handle).unwrap()