Mercurial > templog
view rust/src/paramwaiter.rs @ 614:e1bab5b36352 rust
using some refcells for the paramwaiter
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 07 Mar 2017 23:04:02 +0800 |
parents | 5c2b0d47bb83 |
children |
line wrap: on
line source
extern crate tokio_core; extern crate futures; extern crate rand; extern crate serde_json; use std::time::Duration; use std::io; use std::str; use std::rc::Rc; use std::sync::{Arc,Mutex}; use std::error::Error; use std::cell::{Cell,RefCell}; use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; use tokio_curl::Session; use futures::{Stream,Future,future}; use curl::easy::Easy; use curl::easy; use types::*; use ::Config; #[derive(Deserialize, Debug)] struct Response { epoch_tag: String, params: Params, } #[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, epoch_tag: RefCell<String>, session: RefCell<Option<Session>>, config: Config, handle: Handle, } const LOG_MINUTES: u64 = 15; const MAX_RESPONSE_SIZE: usize = 10000; const TIMEOUT_MINUTES: u64 = 5; impl ParamWaiter { fn make_request(&self) -> Easy { let mut req = Easy::new(); req.get(true).unwrap(); // supposedly req.url won't fail, checking is later? req.url(&self.config.SETTINGS_URL).unwrap(); req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); // http header // etag: epoch-tag let e = self.epoch_tag.borrow(); if !e.is_empty() { let mut list = easy::List::new(); let hd = format!("etag: {}", *e); list.append(&hd).unwrap(); req.http_headers(list).unwrap(); } req } fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { let text = String::from_utf8_lossy(buf).to_string(); let resp = req.response_code()?; match resp { 200 => { // new params let r: Response = serde_json::from_str(&text)?; *self.epoch_tag.borrow_mut() = r.epoch_tag; Ok(Some(r.params)) } 304 => Ok(None), // unmodified (long polling timeout at the server) _ => { Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) }, } } fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { if rself.session.borrow().is_none() { *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); } let ses = rself.session.borrow().clone().unwrap(); let mut req = rself.make_request(); let buf = Arc::new(Mutex::new(Vec::new())); let dst = buf.clone(); req.write_function(move |data| { let mut dst = dst.lock().unwrap(); dst.extend_from_slice(data); if dst.len() > MAX_RESPONSE_SIZE { debug!("Too large params response from server: {}", dst.len()); Ok(0) } else { Ok(data.len()) } }).unwrap(); let s = ses.perform(req) .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) .and_then(move |mut rq| { let result = buf.lock().unwrap(); rself.handle_response(&mut rq, &result) }); Box::new(s) /* let mut p = Params::defaults(); p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); future::ok(Some(p)).boxed() */ } pub fn new(config: &Config, handle: &Handle) -> Self { ParamWaiter { limitlog: NotTooOften::new(LOG_MINUTES*60), epoch_tag: RefCell::new(String::new()), session: RefCell::new(None), config: config.clone(), handle: handle.clone(), } } pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { let rcself = Rc::new(ParamWaiter::new(config, handle)); let dur = Duration::from_millis(4000); let i = Interval::new(dur, &rcself.handle).unwrap() .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { Self::step(rcself.clone()) }) // throw away None params .filter_map(|p| p); Box::new(i) } }