Mercurial > templog
view rust/src/paramwaiter.rs @ 612:5fc41e0833b4 rust
fix TemplogError references
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Thu, 02 Mar 2017 00:06:03 +0800 |
parents | f3e39e2107fd |
children | 5c2b0d47bb83 |
line wrap: on
line source
extern crate tokio_core; extern crate futures; extern crate rand; extern crate serde_json; use std::time::Duration; use std::io; use std::str; use std::rc::Rc; use std::sync::{Arc,Mutex}; use std::error::Error; use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; use tokio_curl::Session; use futures::{Stream,Future,future}; use curl::easy::Easy; use types::*; use ::Config; #[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, epoch_tag: String, session: Option<Session>, config: Config, handle: Handle, } const LOG_MINUTES: u64 = 15; const MAX_RESPONSE_SIZE: usize = 10000; impl ParamWaiter { pub fn new(config: &Config, handle: &Handle) -> Self { ParamWaiter { limitlog: NotTooOften::new(LOG_MINUTES*60), epoch_tag: String::new(), session: None, config: config.clone(), handle: handle.clone(), } } fn make_req(&self) -> Easy { let mut req = Easy::new(); req.get(true).unwrap(); // supposedly req.url won't fail, checking is later? req.url(&self.config.SETTINGS_URL).unwrap(); req } fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { let text = String::from_utf8_lossy(buf).to_string(); let resp = req.response_code()?; match resp { 200 => Ok(Some(serde_json::from_str(&text)?)), // new params 304 => Ok(None), // unmodified (long polling timeout at the server) _ => { Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) }, } } fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { if self.session.is_none() { self.session = Some(Session::new(self.handle.clone())) } let mut req = self.make_req(); let buf = Arc::new(Mutex::new(Vec::new())); let dst = buf.clone(); req.write_function(move |data| { let mut dst = dst.lock().unwrap(); dst.extend_from_slice(data); if dst.len() > MAX_RESPONSE_SIZE { debug!("Too large params response from server: {}", dst.len()); Ok(0) } else { Ok(data.len()) } }).unwrap(); // XXX too many clones let se = self.clone(); let s = self.clone().session.unwrap().perform(req) .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) .and_then(move |mut es| { let result = buf.lock().unwrap(); se.handle_response(&mut es, &result) }); Box::new(s) /* let mut p = Params::defaults(); p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); future::ok(Some(p)).boxed() */ } pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> { let dur = Duration::from_millis(4000); let mut s = Rc::new(self.clone()); let i = Interval::new(dur, &self.handle).unwrap() .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { let ss = Rc::get_mut(&mut s).unwrap(); ss.step() }) // throw away None params .filter_map(|p| p); Box::new(i) } }