view rust/src/paramwaiter.rs @ 611:f3e39e2107fd rust

still doesn't compile, improvements to TemplogError and tokio curl though
author Matt Johnston <matt@ucc.asn.au>
date Tue, 28 Feb 2017 22:58:47 +0800
parents 7bda01659426
children 5c2b0d47bb83
line wrap: on
line source

extern crate tokio_core;
extern crate futures;
extern crate rand;
extern crate serde_json;

use std::time::Duration;
use std::io;
use std::str;
use std::rc::Rc;
use std::sync::{Arc,Mutex};
use std::error::Error;

use tokio_core::reactor::Interval;
use tokio_core::reactor::Handle;
use tokio_curl::Session;
use futures::{Stream,Future,future};
use curl::easy::Easy;

use types::*;
use ::Config;

#[derive(Clone)]
pub struct ParamWaiter {
    limitlog: NotTooOften,
    epoch_tag: String,
    session: Option<Session>,
    config: Config,
    handle: Handle,
}

const LOG_MINUTES: u64 = 15;
const MAX_RESPONSE_SIZE: usize = 10000;

impl ParamWaiter {
    pub fn new(config: &Config, handle: &Handle) -> Self {
        ParamWaiter {
            limitlog: NotTooOften::new(LOG_MINUTES*60),
            epoch_tag: String::new(),
            session: None,
            config: config.clone(),
            handle: handle.clone(),
        }
    }

    fn make_req(&self) -> Easy {
        let mut req = Easy::new();
        req.get(true).unwrap();
        // supposedly req.url won't fail, checking is later?
        req.url(&self.config.SETTINGS_URL).unwrap();
        req
    }

    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
        let text = String::from_utf8_lossy(buf).to_string();
        let resp = req.response_code()?;
        match resp {
            200 => Ok(Some(serde_json::from_str(&text)?)), // new params
            304 => Ok(None), // unmodified (long polling timeout at the server)
            _ => {
                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
            },
        }
    }

    fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
        if self.session.is_none() {
            self.session = Some(Session::new(self.handle.clone()))
        }

        let mut req = self.make_req();
        let buf = Arc::new(Mutex::new(Vec::new()));

        let dst = buf.clone();
        req.write_function(move |data| {
            let mut dst = dst.lock().unwrap();
            dst.extend_from_slice(data);
            if dst.len() > MAX_RESPONSE_SIZE {
                debug!("Too large params response from server: {}", dst.len());
                Ok(0)
            } else {
                Ok(data.len())
            }
        }).unwrap();

        // XXX too many clones
        let se = self.clone();
        let s = self.clone().session.unwrap().perform(req)
            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
            .and_then(move |mut es| {
                let result = buf.lock().unwrap();
                se.handle_response(&mut es, &result)
            });
        Box::new(s)

            /*
        let mut p = Params::defaults();
        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
        future::ok(Some(p)).boxed()
        */
    }

    pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> {

        let dur = Duration::from_millis(4000);
        let mut s = Rc::new(self.clone());
        let i = Interval::new(dur, &self.handle).unwrap()
            .map_err(|e| TemplogError::new_io("interval failed", e))
            .and_then(move |()| {
                let ss = Rc::get_mut(&mut s).unwrap();
                ss.step()
            })
            // throw away None params
            .filter_map(|p| p);
        Box::new(i)
    }
}