Mercurial > templog
changeset 614:e1bab5b36352 rust
using some refcells for the paramwaiter
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 07 Mar 2017 23:04:02 +0800 |
parents | 5c2b0d47bb83 |
children | f153aec221be |
files | rust/src/main.rs rust/src/paramwaiter.rs |
diffstat | 2 files changed, 46 insertions(+), 31 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/src/main.rs Thu Mar 02 00:06:20 2017 +0800 +++ b/rust/src/main.rs Tue Mar 07 23:04:02 2017 +0800 @@ -71,7 +71,7 @@ r }); - let param_stream = paramwaiter::ParamWaiter::new(config, &handle).stream(); + let param_stream = paramwaiter::ParamWaiter::stream(config, &handle); let p = param_stream.map(|p| { fridge::Message::Params(p) });
--- a/rust/src/paramwaiter.rs Thu Mar 02 00:06:20 2017 +0800 +++ b/rust/src/paramwaiter.rs Tue Mar 07 23:04:02 2017 +0800 @@ -9,13 +9,14 @@ use std::rc::Rc; use std::sync::{Arc,Mutex}; use std::error::Error; -use std::cell::Cell; +use std::cell::{Cell,RefCell}; use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; use tokio_curl::Session; use futures::{Stream,Future,future}; use curl::easy::Easy; +use curl::easy; use types::*; use ::Config; @@ -29,31 +30,36 @@ #[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, - epoch_tag: String, - session: Option<Session>, + epoch_tag: RefCell<String>, + session: RefCell<Option<Session>>, + config: Config, handle: Handle, } const LOG_MINUTES: u64 = 15; const MAX_RESPONSE_SIZE: usize = 10000; +const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { - pub fn new(config: &Config, handle: &Handle) -> Self { - ParamWaiter { - limitlog: NotTooOften::new(LOG_MINUTES*60), - epoch_tag: String::new(), - session: None, - config: config.clone(), - handle: handle.clone(), - } - } - - fn make_req(&self) -> Easy { + fn make_request(&self) -> Easy { let mut req = Easy::new(); req.get(true).unwrap(); // supposedly req.url won't fail, checking is later? req.url(&self.config.SETTINGS_URL).unwrap(); + + req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); + + // http header + // etag: epoch-tag + let e = self.epoch_tag.borrow(); + if !e.is_empty() { + let mut list = easy::List::new(); + let hd = format!("etag: {}", *e); + list.append(&hd).unwrap(); + req.http_headers(list).unwrap(); + } + req } @@ -64,8 +70,7 @@ 200 => { // new params let r: Response = serde_json::from_str(&text)?; - // XXX - //self.epoch_tag = r.epoch_tag; + *self.epoch_tag.borrow_mut() = r.epoch_tag; Ok(Some(r.params)) } 304 => Ok(None), // unmodified (long polling timeout at the server) @@ -75,12 +80,14 @@ } } - fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { - if self.session.is_none() { - self.session = Some(Session::new(self.handle.clone())) + fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { + + if rself.session.borrow().is_none() { + *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); } + let ses = rself.session.borrow().clone().unwrap(); - let mut req = self.make_req(); + let mut req = rself.make_request(); let buf = Arc::new(Mutex::new(Vec::new())); let dst = buf.clone(); @@ -95,13 +102,11 @@ } }).unwrap(); - // XXX too many clones - let se = self.clone(); - let s = self.clone().session.unwrap().perform(req) + let s = ses.perform(req) .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) - .and_then(move |mut es| { + .and_then(move |mut rq| { let result = buf.lock().unwrap(); - se.handle_response(&mut es, &result) + rself.handle_response(&mut rq, &result) }); Box::new(s) @@ -112,15 +117,25 @@ */ } - pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> { + pub fn new(config: &Config, handle: &Handle) -> Self { + ParamWaiter { + limitlog: NotTooOften::new(LOG_MINUTES*60), + epoch_tag: RefCell::new(String::new()), + session: RefCell::new(None), + config: config.clone(), + handle: handle.clone(), + } + } + + + pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { + let rcself = Rc::new(ParamWaiter::new(config, handle)); let dur = Duration::from_millis(4000); - let mut s = Rc::new(self.clone()); - let i = Interval::new(dur, &self.handle).unwrap() + let i = Interval::new(dur, &rcself.handle).unwrap() .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { - let ss = Rc::get_mut(&mut s).unwrap(); - ss.step() + Self::step(rcself.clone()) }) // throw away None params .filter_map(|p| p);