changeset 616:a85c0c9bc1fa rust

hide epoch in ParamWaiter make_request handles the buffer too
author Matt Johnston <matt@ucc.asn.au>
date Wed, 08 Mar 2017 23:08:14 +0800 (2017-03-08)
parents f153aec221be
children 87a78343140e
files rust/Cargo.lock rust/Cargo.toml rust/src/main.rs rust/src/params.rs
diffstat 4 files changed, 58 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/rust/Cargo.lock	Tue Mar 07 23:56:12 2017 +0800
+++ b/rust/Cargo.lock	Wed Mar 08 23:08:14 2017 +0800
@@ -2,6 +2,7 @@
 name = "wort-templog"
 version = "0.1.0"
 dependencies = [
+ "base64 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "curl 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
  "docopt 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -38,6 +39,19 @@
 ]
 
 [[package]]
+name = "base64"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "byteorder"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "cfg-if"
 version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -519,6 +533,8 @@
 [metadata]
 "checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66"
 "checksum aho-corasick 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0638fd549427caa90c499814196d1b9e3725eb4d15d7339d6de073a680ed0ca2"
+"checksum base64 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "065a0ce220ab84d0b6d5ae3e7bb77232209519c366f51f946fe28c19e84989d0"
+"checksum byteorder 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c40977b0ee6b9885c9013cd41d9feffdd22deb3bb4dc3a71d901cc7a77de18c8"
 "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
 "checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97"
 "checksum curl 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c90e1240ef340dd4027ade439e5c7c2064dd9dc652682117bd50d1486a3add7b"
--- a/rust/Cargo.toml	Tue Mar 07 23:56:12 2017 +0800
+++ b/rust/Cargo.toml	Wed Mar 08 23:08:14 2017 +0800
@@ -21,4 +21,4 @@
 toml = "0.3"
 curl = "0.4"
 serde_json = "0.9"
-
+base64 = "0.4.0"
--- a/rust/src/main.rs	Tue Mar 07 23:56:12 2017 +0800
+++ b/rust/src/main.rs	Wed Mar 08 23:08:14 2017 +0800
@@ -42,7 +42,6 @@
     let handle = core.handle();
 
     let params = params::Params::load(&config);
-    let epoch = params.epoch.clone();
     let mut fridge = fridge::Fridge::new(&config, nowait, params, &handle);
 
     let (fridge_reading_s, fridge_reading_r) = mpsc::channel(1);
@@ -72,7 +71,7 @@
         r
     });
 
-    let param_stream = params::ParamWaiter::stream(config, epoch, &handle);
+    let param_stream = params::ParamWaiter::stream(config, &handle);
     let p = param_stream.map(|p| {
             fridge::Message::Params(p)
         });
--- a/rust/src/params.rs	Tue Mar 07 23:56:12 2017 +0800
+++ b/rust/src/params.rs	Wed Mar 08 23:08:14 2017 +0800
@@ -2,6 +2,7 @@
 extern crate futures;
 extern crate rand;
 extern crate serde_json;
+extern crate base64;
 
 use std::time::Duration;
 use std::io;
@@ -17,6 +18,7 @@
 use futures::{Stream,Future,future};
 use curl::easy::Easy;
 use curl::easy;
+use self::rand::Rng;
 
 use types::*;
 use ::Config;
@@ -31,9 +33,6 @@
     pub nowort: bool,
     pub fridge_range_lower: f32,
     pub fridge_range_upper: f32,
-
-    #[serde(skip_serializing)]
-    pub epoch: String,
 }
 
 impl Params {
@@ -47,28 +46,33 @@
             nowort: false,
             fridge_range_lower: 3.0,
             fridge_range_upper: 3.0,
-            epoch: String::new(),
             }
     }
 
     pub fn load(config: &Config) -> Params {
         // generate random epoch on success.
         // TODO: return failure? or just return default() ?
+        let mut p = Params::defaults();
+
         unimplemented!();
     }
 }
 
 #[derive(Deserialize, Debug)]
 struct Response {
-    epoch_tag: String,
+    // sent as an opaque etag: header. Has format "epoch-nonce",
+    // responses where the epoch do not match ParamWaiter::epoch are dropped
+    etag: String,
     params: Params,
 }
 
 #[derive(Clone)]
 pub struct ParamWaiter {
     limitlog: NotTooOften,
-    epoch_tag: RefCell<String>,
+    // last_etag is used for long-polling.
+    last_etag: RefCell<String>,
     session: RefCell<Option<Session>>,
+    epoch: String,
 
     config: Config,
     handle: Handle,
@@ -79,17 +83,31 @@
 const TIMEOUT_MINUTES: u64 = 5;
 
 impl ParamWaiter {
-    fn make_request(&self) -> Easy {
+    fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) {
         let mut req = Easy::new();
         req.get(true).unwrap();
-        // supposedly req.url won't fail, checking is later?
+        // supposedly req.url won't fail, checking happens later?
         req.url(&self.config.SETTINGS_URL).unwrap();
 
         req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
 
+        // store response
+        let max_response_size = 10000;
+        let buf = Arc::new(Mutex::new(Vec::new()));
+        let dst = buf.clone();
+        req.write_function(move |data| {
+            let mut dst = dst.lock().unwrap();
+            dst.extend_from_slice(data);
+            if dst.len() > max_response_size {
+                error!("Too large params response from server: {}", dst.len());
+                Ok(0)
+            } else {
+                Ok(data.len())
+            }
+        }).unwrap();
+
         // http header
-        //   etag: epoch-tag
-        let e = self.epoch_tag.borrow();
+        let e = self.last_etag.borrow();
         if !e.is_empty() {
             let mut list = easy::List::new();
             let hd = format!("etag: {}", *e);
@@ -97,7 +115,7 @@
             req.http_headers(list).unwrap();
         }
 
-        req
+        (req, buf)
     }
 
     fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
@@ -107,7 +125,7 @@
             200 => {
                 // new params
                 let r: Response = serde_json::from_str(&text)?;
-                *self.epoch_tag.borrow_mut() = r.epoch_tag;
+                *self.last_etag.borrow_mut() = r.etag;
                 Ok(Some(r.params))
             }
             304 => Ok(None), // unmodified (long polling timeout at the server)
@@ -122,23 +140,10 @@
         if rself.session.borrow().is_none() {
             *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
         }
-        let ses = rself.session.borrow().clone().unwrap();
-
-        let mut req = rself.make_request();
-        let buf = Arc::new(Mutex::new(Vec::new()));
 
-        let dst = buf.clone();
-        req.write_function(move |data| {
-            let mut dst = dst.lock().unwrap();
-            dst.extend_from_slice(data);
-            if dst.len() > MAX_RESPONSE_SIZE {
-                debug!("Too large params response from server: {}", dst.len());
-                Ok(0)
-            } else {
-                Ok(data.len())
-            }
-        }).unwrap();
+        let (mut req, buf) = rself.make_request();
 
+        let ses = rself.session.borrow().clone().unwrap();
         let s = ses.perform(req)
             .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
             .and_then(move |mut rq| {
@@ -154,18 +159,22 @@
         */
     }
 
-    fn new(config: &Config, epoch: String, handle: &Handle) -> Self {
+    fn new(config: &Config, handle: &Handle) -> Self {
+        let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
+        rand::OsRng::new().unwrap().fill_bytes(&mut b);
+        let epoch = base64::encode(&b);
         ParamWaiter {
             limitlog: NotTooOften::new(LOG_MINUTES*60),
-            epoch_tag: RefCell::new(epoch),
+            last_etag: RefCell::new(String::new()),
             session: RefCell::new(None),
+            epoch: epoch,
             config: config.clone(),
             handle: handle.clone(),
         }
     }
 
-    pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
-        let rcself = Rc::new(ParamWaiter::new(config, epoch, handle));
+    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
+        let rcself = Rc::new(ParamWaiter::new(config, handle));
 
         let dur = Duration::from_millis(4000);
         let i = Interval::new(dur, &rcself.handle).unwrap()