diff rust/src/params.rs @ 616:a85c0c9bc1fa rust

hide epoch in ParamWaiter make_request handles the buffer too
author Matt Johnston <matt@ucc.asn.au>
date Wed, 08 Mar 2017 23:08:14 +0800
parents f153aec221be
children 2d65a9f0bed3
line wrap: on
line diff
--- a/rust/src/params.rs	Tue Mar 07 23:56:12 2017 +0800
+++ b/rust/src/params.rs	Wed Mar 08 23:08:14 2017 +0800
@@ -2,6 +2,7 @@
 extern crate futures;
 extern crate rand;
 extern crate serde_json;
+extern crate base64;
 
 use std::time::Duration;
 use std::io;
@@ -17,6 +18,7 @@
 use futures::{Stream,Future,future};
 use curl::easy::Easy;
 use curl::easy;
+use self::rand::Rng;
 
 use types::*;
 use ::Config;
@@ -31,9 +33,6 @@
     pub nowort: bool,
     pub fridge_range_lower: f32,
     pub fridge_range_upper: f32,
-
-    #[serde(skip_serializing)]
-    pub epoch: String,
 }
 
 impl Params {
@@ -47,28 +46,33 @@
             nowort: false,
             fridge_range_lower: 3.0,
             fridge_range_upper: 3.0,
-            epoch: String::new(),
             }
     }
 
     pub fn load(config: &Config) -> Params {
         // generate random epoch on success.
         // TODO: return failure? or just return default() ?
+        let mut p = Params::defaults();
+
         unimplemented!();
     }
 }
 
 #[derive(Deserialize, Debug)]
 struct Response {
-    epoch_tag: String,
+    // sent as an opaque etag: header. Has format "epoch-nonce",
+    // responses where the epoch do not match ParamWaiter::epoch are dropped
+    etag: String,
     params: Params,
 }
 
 #[derive(Clone)]
 pub struct ParamWaiter {
     limitlog: NotTooOften,
-    epoch_tag: RefCell<String>,
+    // last_etag is used for long-polling.
+    last_etag: RefCell<String>,
     session: RefCell<Option<Session>>,
+    epoch: String,
 
     config: Config,
     handle: Handle,
@@ -79,17 +83,31 @@
 const TIMEOUT_MINUTES: u64 = 5;
 
 impl ParamWaiter {
-    fn make_request(&self) -> Easy {
+    fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) {
         let mut req = Easy::new();
         req.get(true).unwrap();
-        // supposedly req.url won't fail, checking is later?
+        // supposedly req.url won't fail, checking happens later?
         req.url(&self.config.SETTINGS_URL).unwrap();
 
         req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
 
+        // store response
+        let max_response_size = 10000;
+        let buf = Arc::new(Mutex::new(Vec::new()));
+        let dst = buf.clone();
+        req.write_function(move |data| {
+            let mut dst = dst.lock().unwrap();
+            dst.extend_from_slice(data);
+            if dst.len() > max_response_size {
+                error!("Too large params response from server: {}", dst.len());
+                Ok(0)
+            } else {
+                Ok(data.len())
+            }
+        }).unwrap();
+
         // http header
-        //   etag: epoch-tag
-        let e = self.epoch_tag.borrow();
+        let e = self.last_etag.borrow();
         if !e.is_empty() {
             let mut list = easy::List::new();
             let hd = format!("etag: {}", *e);
@@ -97,7 +115,7 @@
             req.http_headers(list).unwrap();
         }
 
-        req
+        (req, buf)
     }
 
     fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
@@ -107,7 +125,7 @@
             200 => {
                 // new params
                 let r: Response = serde_json::from_str(&text)?;
-                *self.epoch_tag.borrow_mut() = r.epoch_tag;
+                *self.last_etag.borrow_mut() = r.etag;
                 Ok(Some(r.params))
             }
             304 => Ok(None), // unmodified (long polling timeout at the server)
@@ -122,23 +140,10 @@
         if rself.session.borrow().is_none() {
             *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
         }
-        let ses = rself.session.borrow().clone().unwrap();
-
-        let mut req = rself.make_request();
-        let buf = Arc::new(Mutex::new(Vec::new()));
 
-        let dst = buf.clone();
-        req.write_function(move |data| {
-            let mut dst = dst.lock().unwrap();
-            dst.extend_from_slice(data);
-            if dst.len() > MAX_RESPONSE_SIZE {
-                debug!("Too large params response from server: {}", dst.len());
-                Ok(0)
-            } else {
-                Ok(data.len())
-            }
-        }).unwrap();
+        let (mut req, buf) = rself.make_request();
 
+        let ses = rself.session.borrow().clone().unwrap();
         let s = ses.perform(req)
             .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
             .and_then(move |mut rq| {
@@ -154,18 +159,22 @@
         */
     }
 
-    fn new(config: &Config, epoch: String, handle: &Handle) -> Self {
+    fn new(config: &Config, handle: &Handle) -> Self {
+        let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
+        rand::OsRng::new().unwrap().fill_bytes(&mut b);
+        let epoch = base64::encode(&b);
         ParamWaiter {
             limitlog: NotTooOften::new(LOG_MINUTES*60),
-            epoch_tag: RefCell::new(epoch),
+            last_etag: RefCell::new(String::new()),
             session: RefCell::new(None),
+            epoch: epoch,
             config: config.clone(),
             handle: handle.clone(),
         }
     }
 
-    pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
-        let rcself = Rc::new(ParamWaiter::new(config, epoch, handle));
+    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
+        let rcself = Rc::new(ParamWaiter::new(config, handle));
 
         let dur = Duration::from_millis(4000);
         let i = Interval::new(dur, &rcself.handle).unwrap()