view rust/src/paramwaiter.rs @ 614:e1bab5b36352 rust

using some refcells for the paramwaiter
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:04:02 +0800
parents 5c2b0d47bb83
children
line wrap: on
line source

extern crate tokio_core;
extern crate futures;
extern crate rand;
extern crate serde_json;

use std::time::Duration;
use std::io;
use std::str;
use std::rc::Rc;
use std::sync::{Arc,Mutex};
use std::error::Error;
use std::cell::{Cell,RefCell};

use tokio_core::reactor::Interval;
use tokio_core::reactor::Handle;
use tokio_curl::Session;
use futures::{Stream,Future,future};
use curl::easy::Easy;
use curl::easy;

use types::*;
use ::Config;

#[derive(Deserialize, Debug)]
struct Response {
    epoch_tag: String,
    params: Params,
}

#[derive(Clone)]
pub struct ParamWaiter {
    limitlog: NotTooOften,
    epoch_tag: RefCell<String>,
    session: RefCell<Option<Session>>,

    config: Config,
    handle: Handle,
}

const LOG_MINUTES: u64 = 15;
const MAX_RESPONSE_SIZE: usize = 10000;
const TIMEOUT_MINUTES: u64 = 5;

impl ParamWaiter {
    fn make_request(&self) -> Easy {
        let mut req = Easy::new();
        req.get(true).unwrap();
        // supposedly req.url won't fail, checking is later?
        req.url(&self.config.SETTINGS_URL).unwrap();

        req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();

        // http header
        //   etag: epoch-tag
        let e = self.epoch_tag.borrow();
        if !e.is_empty() {
            let mut list = easy::List::new();
            let hd = format!("etag: {}", *e);
            list.append(&hd).unwrap();
            req.http_headers(list).unwrap();
        }

        req
    }

    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
        let text = String::from_utf8_lossy(buf).to_string();
        let resp = req.response_code()?;
        match resp {
            200 => {
                // new params
                let r: Response = serde_json::from_str(&text)?;
                *self.epoch_tag.borrow_mut() = r.epoch_tag;
                Ok(Some(r.params))
            }
            304 => Ok(None), // unmodified (long polling timeout at the server)
            _ => {
                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
            },
        }
    }

    fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {

        if rself.session.borrow().is_none() {
            *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
        }
        let ses = rself.session.borrow().clone().unwrap();

        let mut req = rself.make_request();
        let buf = Arc::new(Mutex::new(Vec::new()));

        let dst = buf.clone();
        req.write_function(move |data| {
            let mut dst = dst.lock().unwrap();
            dst.extend_from_slice(data);
            if dst.len() > MAX_RESPONSE_SIZE {
                debug!("Too large params response from server: {}", dst.len());
                Ok(0)
            } else {
                Ok(data.len())
            }
        }).unwrap();

        let s = ses.perform(req)
            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
            .and_then(move |mut rq| {
                let result = buf.lock().unwrap();
                rself.handle_response(&mut rq, &result)
            });
        Box::new(s)

            /*
        let mut p = Params::defaults();
        p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
        future::ok(Some(p)).boxed()
        */
    }

    pub fn new(config: &Config, handle: &Handle) -> Self {
        ParamWaiter {
            limitlog: NotTooOften::new(LOG_MINUTES*60),
            epoch_tag: RefCell::new(String::new()),
            session: RefCell::new(None),
            config: config.clone(),
            handle: handle.clone(),
        }
    }


    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
        let rcself = Rc::new(ParamWaiter::new(config, handle));

        let dur = Duration::from_millis(4000);
        let i = Interval::new(dur, &rcself.handle).unwrap()
            .map_err(|e| TemplogError::new_io("interval failed", e))
            .and_then(move |()| {
                Self::step(rcself.clone())
            })
            // throw away None params
            .filter_map(|p| p);
        Box::new(i)
    }
}