Mercurial > templog
diff rust/src/params.rs @ 616:a85c0c9bc1fa rust
hide epoch in ParamWaiter
make_request handles the buffer too
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 08 Mar 2017 23:08:14 +0800 |
parents | f153aec221be |
children | 2d65a9f0bed3 |
line wrap: on
line diff
--- a/rust/src/params.rs Tue Mar 07 23:56:12 2017 +0800 +++ b/rust/src/params.rs Wed Mar 08 23:08:14 2017 +0800 @@ -2,6 +2,7 @@ extern crate futures; extern crate rand; extern crate serde_json; +extern crate base64; use std::time::Duration; use std::io; @@ -17,6 +18,7 @@ use futures::{Stream,Future,future}; use curl::easy::Easy; use curl::easy; +use self::rand::Rng; use types::*; use ::Config; @@ -31,9 +33,6 @@ pub nowort: bool, pub fridge_range_lower: f32, pub fridge_range_upper: f32, - - #[serde(skip_serializing)] - pub epoch: String, } impl Params { @@ -47,28 +46,33 @@ nowort: false, fridge_range_lower: 3.0, fridge_range_upper: 3.0, - epoch: String::new(), } } pub fn load(config: &Config) -> Params { // generate random epoch on success. // TODO: return failure? or just return default() ? + let mut p = Params::defaults(); + unimplemented!(); } } #[derive(Deserialize, Debug)] struct Response { - epoch_tag: String, + // sent as an opaque etag: header. Has format "epoch-nonce", + // responses where the epoch do not match ParamWaiter::epoch are dropped + etag: String, params: Params, } #[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, - epoch_tag: RefCell<String>, + // last_etag is used for long-polling. + last_etag: RefCell<String>, session: RefCell<Option<Session>>, + epoch: String, config: Config, handle: Handle, @@ -79,17 +83,31 @@ const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { - fn make_request(&self) -> Easy { + fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) { let mut req = Easy::new(); req.get(true).unwrap(); - // supposedly req.url won't fail, checking is later? + // supposedly req.url won't fail, checking happens later? req.url(&self.config.SETTINGS_URL).unwrap(); req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); + // store response + let max_response_size = 10000; + let buf = Arc::new(Mutex::new(Vec::new())); + let dst = buf.clone(); + req.write_function(move |data| { + let mut dst = dst.lock().unwrap(); + dst.extend_from_slice(data); + if dst.len() > max_response_size { + error!("Too large params response from server: {}", dst.len()); + Ok(0) + } else { + Ok(data.len()) + } + }).unwrap(); + // http header - // etag: epoch-tag - let e = self.epoch_tag.borrow(); + let e = self.last_etag.borrow(); if !e.is_empty() { let mut list = easy::List::new(); let hd = format!("etag: {}", *e); @@ -97,7 +115,7 @@ req.http_headers(list).unwrap(); } - req + (req, buf) } fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { @@ -107,7 +125,7 @@ 200 => { // new params let r: Response = serde_json::from_str(&text)?; - *self.epoch_tag.borrow_mut() = r.epoch_tag; + *self.last_etag.borrow_mut() = r.etag; Ok(Some(r.params)) } 304 => Ok(None), // unmodified (long polling timeout at the server) @@ -122,23 +140,10 @@ if rself.session.borrow().is_none() { *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); } - let ses = rself.session.borrow().clone().unwrap(); - - let mut req = rself.make_request(); - let buf = Arc::new(Mutex::new(Vec::new())); - let dst = buf.clone(); - req.write_function(move |data| { - let mut dst = dst.lock().unwrap(); - dst.extend_from_slice(data); - if dst.len() > MAX_RESPONSE_SIZE { - debug!("Too large params response from server: {}", dst.len()); - Ok(0) - } else { - Ok(data.len()) - } - }).unwrap(); + let (mut req, buf) = rself.make_request(); + let ses = rself.session.borrow().clone().unwrap(); let s = ses.perform(req) .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) .and_then(move |mut rq| { @@ -154,18 +159,22 @@ */ } - fn new(config: &Config, epoch: String, handle: &Handle) -> Self { + fn new(config: &Config, handle: &Handle) -> Self { + let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 + rand::OsRng::new().unwrap().fill_bytes(&mut b); + let epoch = base64::encode(&b); ParamWaiter { limitlog: NotTooOften::new(LOG_MINUTES*60), - epoch_tag: RefCell::new(epoch), + last_etag: RefCell::new(String::new()), session: RefCell::new(None), + epoch: epoch, config: config.clone(), handle: handle.clone(), } } - pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { - let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); + pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { + let rcself = Rc::new(ParamWaiter::new(config, handle)); let dur = Duration::from_millis(4000); let i = Interval::new(dur, &rcself.handle).unwrap()