diff rust/src/paramwaiter.rs @ 614:e1bab5b36352 rust

using some refcells for the paramwaiter
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:04:02 +0800
parents 5c2b0d47bb83
children
line wrap: on
line diff
--- a/rust/src/paramwaiter.rs	Thu Mar 02 00:06:20 2017 +0800
+++ b/rust/src/paramwaiter.rs	Tue Mar 07 23:04:02 2017 +0800
@@ -9,13 +9,14 @@
 use std::rc::Rc;
 use std::sync::{Arc,Mutex};
 use std::error::Error;
-use std::cell::Cell;
+use std::cell::{Cell,RefCell};
 
 use tokio_core::reactor::Interval;
 use tokio_core::reactor::Handle;
 use tokio_curl::Session;
 use futures::{Stream,Future,future};
 use curl::easy::Easy;
+use curl::easy;
 
 use types::*;
 use ::Config;
@@ -29,31 +30,36 @@
 #[derive(Clone)]
 pub struct ParamWaiter {
     limitlog: NotTooOften,
-    epoch_tag: String,
-    session: Option<Session>,
+    epoch_tag: RefCell<String>,
+    session: RefCell<Option<Session>>,
+
     config: Config,
     handle: Handle,
 }
 
 const LOG_MINUTES: u64 = 15;
 const MAX_RESPONSE_SIZE: usize = 10000;
+const TIMEOUT_MINUTES: u64 = 5;
 
 impl ParamWaiter {
-    pub fn new(config: &Config, handle: &Handle) -> Self {
-        ParamWaiter {
-            limitlog: NotTooOften::new(LOG_MINUTES*60),
-            epoch_tag: String::new(),
-            session: None,
-            config: config.clone(),
-            handle: handle.clone(),
-        }
-    }
-
-    fn make_req(&self) -> Easy {
+    fn make_request(&self) -> Easy {
         let mut req = Easy::new();
         req.get(true).unwrap();
         // supposedly req.url won't fail, checking is later?
         req.url(&self.config.SETTINGS_URL).unwrap();
+
+        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
+
+        // http header
+        //   etag: epoch-tag
+        let e = self.epoch_tag.borrow();
+        if !e.is_empty() {
+            let mut list = easy::List::new();
+            let hd = format!("etag: {}", *e);
+            list.append(&hd).unwrap();
+            req.http_headers(list).unwrap();
+        }
+
         req
     }
 
@@ -64,8 +70,7 @@
             200 => {
                 // new params
                 let r: Response = serde_json::from_str(&text)?;
-                // XXX
-                //self.epoch_tag = r.epoch_tag;
+                *self.epoch_tag.borrow_mut() = r.epoch_tag;
                 Ok(Some(r.params))
             }
             304 => Ok(None), // unmodified (long polling timeout at the server)
@@ -75,12 +80,14 @@
         }
     }
 
-    fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
-        if self.session.is_none() {
-            self.session = Some(Session::new(self.handle.clone()))
+    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
+
+        if rself.session.borrow().is_none() {
+            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
         }
+        let ses = rself.session.borrow().clone().unwrap();
 
-        let mut req = self.make_req();
+        let mut req = rself.make_request();
         let buf = Arc::new(Mutex::new(Vec::new()));
 
         let dst = buf.clone();
@@ -95,13 +102,11 @@
             }
         }).unwrap();
 
-        // XXX too many clones
-        let se = self.clone();
-        let s = self.clone().session.unwrap().perform(req)
+        let s = ses.perform(req)
             .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
-            .and_then(move |mut es| {
+            .and_then(move |mut rq| {
                 let result = buf.lock().unwrap();
-                se.handle_response(&mut es, &result)
+                rself.handle_response(&mut rq, &result)
             });
         Box::new(s)
 
@@ -112,15 +117,25 @@
         */
     }
 
-    pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> {
+    pub fn new(config: &Config, handle: &Handle) -> Self {
+        ParamWaiter {
+            limitlog: NotTooOften::new(LOG_MINUTES*60),
+            epoch_tag: RefCell::new(String::new()),
+            session: RefCell::new(None),
+            config: config.clone(),
+            handle: handle.clone(),
+        }
+    }
+
+
+    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
+        let rcself = Rc::new(ParamWaiter::new(config, handle));
 
         let dur = Duration::from_millis(4000);
-        let mut s = Rc::new(self.clone());
-        let i = Interval::new(dur, &self.handle).unwrap()
+        let i = Interval::new(dur, &rcself.handle).unwrap()
             .map_err(|e| TemplogError::new_io("interval failed", e))
             .and_then(move |()| {
-                let ss = Rc::get_mut(&mut s).unwrap();
-                ss.step()
+                Self::step(rcself.clone())
             })
             // throw away None params
             .filter_map(|p| p);