Mercurial > templog
diff rust/src/paramwaiter.rs @ 611:f3e39e2107fd rust
still doesn't compile, improvements to TemplogError and tokio curl though
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 28 Feb 2017 22:58:47 +0800 |
parents | 7bda01659426 |
children | 5c2b0d47bb83 |
line wrap: on
line diff
--- a/rust/src/paramwaiter.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/paramwaiter.rs Tue Feb 28 22:58:47 2017 +0800 @@ -1,65 +1,113 @@ extern crate tokio_core; extern crate futures; extern crate rand; +extern crate serde_json; use std::time::Duration; use std::io; +use std::str; +use std::rc::Rc; +use std::sync::{Arc,Mutex}; +use std::error::Error; use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; use tokio_curl::Session; use futures::{Stream,Future,future}; +use curl::easy::Easy; + use types::*; -use curl::Easy; use ::Config; +#[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, epoch_tag: String, session: Option<Session>, config: Config, + handle: Handle, } -const LOGMINUTES: u64 = 15; +const LOG_MINUTES: u64 = 15; +const MAX_RESPONSE_SIZE: usize = 10000; impl ParamWaiter { - pub fn new(config: &Config) -> Self { + pub fn new(config: &Config, handle: &Handle) -> Self { ParamWaiter { - limitlog: NotTooOften::new(LOGMINUTES*60), + limitlog: NotTooOften::new(LOG_MINUTES*60), epoch_tag: String::new(), session: None, config: config.clone(), + handle: handle.clone(), } } fn make_req(&self) -> Easy { let mut req = Easy::new(); req.get(true).unwrap(); - req.url(config.SETTINGS_URL); + // supposedly req.url won't fail, checking is later? + req.url(&self.config.SETTINGS_URL).unwrap(); + req } - fn step(&mut self, handle: &Handle) -> Box<Future<Item=Option<Params>, Error=io::Error>> { + fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { + let text = String::from_utf8_lossy(buf).to_string(); + let resp = req.response_code()?; + match resp { + 200 => Ok(Some(serde_json::from_str(&text)?)), // new params + 304 => Ok(None), // unmodified (long polling timeout at the server) + _ => { + Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) + }, + } + } + + fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { if self.session.is_none() { - self.session = Some(Session::new(handle.clone())) + self.session = Some(Session::new(self.handle.clone())) } - let req = self.make_req(); - /* - self.session.unwrap().perform(self.make_req()) - .and_then(||) - */ + let mut req = self.make_req(); + let buf = Arc::new(Mutex::new(Vec::new())); + let dst = buf.clone(); + req.write_function(move |data| { + let mut dst = dst.lock().unwrap(); + dst.extend_from_slice(data); + if dst.len() > MAX_RESPONSE_SIZE { + debug!("Too large params response from server: {}", dst.len()); + Ok(0) + } else { + Ok(data.len()) + } + }).unwrap(); + + // XXX too many clones + let se = self.clone(); + let s = self.clone().session.unwrap().perform(req) + .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) + .and_then(move |mut es| { + let result = buf.lock().unwrap(); + se.handle_response(&mut es, &result) + }); + Box::new(s) + + /* let mut p = Params::defaults(); p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); - future::ok(p).boxed() + future::ok(Some(p)).boxed() + */ } - pub fn stream(&mut self, handle: &Handle) -> Box<Stream<Item=Params, Error=io::Error>> { + pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> { let dur = Duration::from_millis(4000); - let i = Interval::new(dur, handle).unwrap() + let mut s = Rc::new(self.clone()); + let i = Interval::new(dur, &self.handle).unwrap() + .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { - s.step() + let ss = Rc::get_mut(&mut s).unwrap(); + ss.step() }) // throw away None params .filter_map(|p| p);