view rust/src/fridge.rs @ 607:282fae1c12e4 rust

NotTooOften
author Matt Johnston <matt@ucc.asn.au>
date Fri, 17 Feb 2017 22:27:44 +0800
parents 3c1d37d78415
children 7bda01659426
line wrap: on
line source

extern crate futures;
extern crate tokio_core;

use std;
use std::io;
use std::mem;
use std::time::{Duration,Instant};

use futures::{Future,future,Sink,Stream};
use tokio_core::reactor::{Timeout,Handle};
use futures::sync::{mpsc};

use ::Config;
use types::*;

#[derive(Debug)]
pub enum Message {
    Sensor {wort: Option<f32>, fridge: Option<f32>},
    Params (Params),
    Tick(u64),
}

pub struct Fridge {
    params: Params,
    config: Config,
    temp_wort: Option<f32>,
    temp_fridge: Option<f32>,

    // Timeouts to wake ourselves up again
    handle: Handle,
    timeout_s: mpsc::Sender<u64>,
    timeout_r: Option<mpsc::Receiver<u64>>,
    ticker: u64,
    last_off_time: Instant,
    wort_valid_time: Instant,
}

impl Sink for Fridge {

    type SinkItem = Message;
    type SinkError = std::io::Error;

    fn start_send(&mut self, msg: Message)
            -> futures::StartSend<Self::SinkItem, Self::SinkError> {
        self.process_msg(msg);
        Ok(futures::AsyncSink::Ready)
    }

    fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
        Ok(futures::Async::Ready(()))
    }
}

impl Fridge {
    pub fn new(config: &Config, nowait: bool, p: Params, handle: &Handle) -> Fridge {
        let (s, r) = mpsc::channel(1);
        let mut f = Fridge { 
            config: config.clone(),
            params: p,
            temp_wort: None,
            temp_fridge: None,

            handle: handle.clone(),
            timeout_s: s,
            timeout_r: Some(r),
            ticker: 0,
            last_off_time: Instant::now(),
            wort_valid_time: Instant::now() - Duration::new(config.FRIDGE_WORT_INVALID_TIME, 100),
        };
        if nowait {
            f.last_off_time -= Duration::new(config.FRIDGE_DELAY, 100);
        }
        f.tick();
        f
    }

    fn next_wakeup(&self) -> Duration {
        let millis = 400;
        let dur = Duration::from_millis(millis);
        dur
    }

    /// The fridge needs to periodically wake itself up, the returned
    /// stream of Tick messages does so.
    /// Examples of wakeups events are
    /// 
    ///  * overshoot calculation
    ///  * minimum fridge-off time
    ///  * invalid wort timeout
    /// All specified in next_wakeup()
    pub fn wakeups(&mut self)
            -> Box<Stream<Item=Message, Error=io::Error>> {
        mem::replace(&mut self.timeout_r, None)
            .expect("Fridge::wakeups() can only be called once")
            .map(|v| Message::Tick(v))
            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel"))
            .boxed()
    }

    /// Must be called after every state change. Turns the fridge on/off as required and
    /// schedules any future wakeups based on the present (new) state
    fn tick(&mut self) {
        debug!("tick");

        self.send_next_timeout();
    }

    /// Sets the next self-wakeup timeout
    fn send_next_timeout(&mut self) {
        let waker = self.timeout_s.clone();
        let dur = self.next_wakeup();
        debug!("fridge next wakeup {:?}", dur);
        self.ticker += 1;
        let v = self.ticker;
        let t = Timeout::new(dur, &self.handle).unwrap()
            .map_err(|_| ())
            .and_then(move |_| {
                waker.send(v)
                    .map_err(|e| {
                        warn!("Send error in tick(): {}", e.to_string());
                        ()
                    })
            })
            .map(|_| ());
        self.handle.spawn(t);
    }

    fn process_msg(&mut self, msg: Message)
            -> Box<Future<Item=(), Error=()>> {
        debug!("process_msg {:?}", msg);
        match msg {
            Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge),
            Message::Params(p) => self.update_params(p),
            Message::Tick(v) => if v == self.ticker {self.tick()}, // schedule a timeout if there are none pending
        };
        future::ok::<(),()>(()).boxed()
    }

    pub fn update_params(&mut self, p: Params) {
        self.params = p;
        println!("fridge set_params {:?}", self.params);

        self.tick();
    }

    pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) {
        self.temp_wort = wort;
        self.temp_fridge = fridge;

	if self.temp_wort.is_some() {
            self.wort_valid_time = Instant::now();
	}

        self.tick();
    }

}