view rust/src/fridge.rs @ 596:ca8102feaca6 rust

sensor takes config parameter
author Matt Johnston <matt@ucc.asn.au>
date Fri, 06 Jan 2017 22:04:10 +0800
parents e87655ed8429
children a440eafa84a9
line wrap: on
line source

extern crate futures;
extern crate tokio_core;

use std;
use std::io;
use std::mem;
use std::time::Duration;

use futures::{Future,future,Sink,Stream};
use tokio_core::reactor::{Timeout,Handle};
use futures::sync::{mpsc};

use types::*;

#[derive(Debug)]
pub enum Message {
    Sensor {wort: Option<f32>, fridge: Option<f32>},
    Params (Params),
    Tick(u64),
}

pub struct Fridge {
    params: Params,
    temp_wort: Option<f32>,
    temp_fridge: Option<f32>,

    // Timeouts to wake ourselves up again
    handle: Handle,
    timeout_s: mpsc::Sender<u64>,
    timeout_r: Option<mpsc::Receiver<u64>>,
    ticker: u64,
}

impl Sink for Fridge {

    type SinkItem = Message;
    type SinkError = std::io::Error;

    fn start_send(&mut self, msg: Message)
            -> futures::StartSend<Self::SinkItem, Self::SinkError> {
        self.process_msg(msg);
        Ok(futures::AsyncSink::Ready)
    }

    fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
        Ok(futures::Async::Ready(()))
    }
}

impl Fridge {
    pub fn new(p: Params, handle: &Handle) -> Fridge {
        let (s, r) = mpsc::channel(1);
        let mut f = Fridge { 
            params: p,
            temp_wort: None,
            temp_fridge: None,

            handle: handle.clone(),
            timeout_s: s,
            timeout_r: Some(r),
            ticker: 0,
        };
        f.tick();
        f
    }

    /// Returns a stream of timeouts for fridge, waking when next necessary
    pub fn timeouts(&mut self)
            -> Box<Stream<Item=Message, Error=io::Error>> {
        mem::replace(&mut self.timeout_r, None)
            .expect("NumberWatcher::timeouts() can only be called once")
            .map(|v| Message::Tick(v))
            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel"))
            .boxed()
    }

    fn next_wakeup(&self) -> Duration {
        let millis = 400;
        let dur = Duration::from_millis(millis);
        dur
    }

    fn tick(&mut self) {
        debug!("tick");

        self.send_next_timeout();
    }

    /// Sets the next self-wakeup timeout
    fn send_next_timeout(&mut self) {
        let waker = self.timeout_s.clone();
        let dur = self.next_wakeup();
        debug!("fridge next wakeup {:?}", dur);
        self.ticker += 1;
        let v = self.ticker;
        let t = Timeout::new(dur, &self.handle).unwrap()
            .map_err(|_| ())
            .and_then(move |_| {
                waker.send(v)
                    .map_err(|e| {
                        warn!("Send error in tick(): {}", e.to_string());
                        ()
                    })
            })
            .map(|_| ());
        self.handle.spawn(t);
    }

    fn process_msg(&mut self, msg: Message)
            -> Box<Future<Item=(), Error=()>> {
        debug!("process_msg {:?}", msg);
        match msg {
            Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge),
            Message::Params(p) => self.update_params(p),
            Message::Tick(v) => if v == self.ticker {self.tick()},
        };
        future::ok::<(),()>(()).boxed()
    }

    pub fn update_params(&mut self, p: Params) {
        self.params = p;
        println!("fridge set_params {:?}", self.params);

        self.tick();
    }

    pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) {
        self.temp_wort = wort;
        self.temp_fridge = fridge;

        self.tick();
    }

}