changeset 594:aff50ee77252 rust

rust working better now with streams and sinks.
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Jan 2017 17:18:44 +0800
parents bf138339d20a
children e87655ed8429
files .hgignore rust/Cargo.lock rust/Cargo.toml rust/src/fridge.rs rust/src/main.rs rust/src/paramwaiter.rs rust/src/sensor.rs rust/src/types.rs
diffstat 8 files changed, 328 insertions(+), 111 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Tue Dec 27 00:51:28 2016 +0800
+++ b/.hgignore	Wed Jan 04 17:18:44 2017 +0800
@@ -1,1 +1,2 @@
 venv
+rust/target
--- a/rust/Cargo.lock	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/Cargo.lock	Wed Jan 04 17:18:44 2017 +0800
@@ -2,10 +2,20 @@
 name = "wort-templog"
 version = "0.1.0"
 dependencies = [
- "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
  "rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)",
  "rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
- "tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "aho-corasick"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
@@ -19,8 +29,17 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "env_logger"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "futures"
-version = "0.1.6"
+version = "0.1.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 dependencies = [
  "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -51,15 +70,23 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "memchr"
+version = "0.1.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "mio"
-version = "0.6.1"
+version = "0.6.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 dependencies = [
  "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
  "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
- "miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
  "net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)",
  "nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -68,7 +95,7 @@
 
 [[package]]
 name = "miow"
-version = "0.1.3"
+version = "0.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 dependencies = [
  "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -111,6 +138,23 @@
 ]
 
 [[package]]
+name = "regex"
+version = "0.1.80"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
+ "regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
+ "thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "rustc-serialize"
 version = "0.3.22"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -139,18 +183,40 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
-name = "tokio-core"
-version = "0.1.1"
+name = "thread-id"
+version = "2.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 dependencies = [
- "futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "thread_local"
+version = "0.2.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "tokio-core"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
  "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
- "mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "mio 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
+name = "utf8-ranges"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "void"
 version = "1.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -175,24 +241,32 @@
 ]
 
 [metadata]
+"checksum aho-corasick 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ca972c2ea5f742bfce5687b9aef75506a764f61d37f8f649047846a9686ddb66"
 "checksum bitflags 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8dead7461c1127cf637931a1e50934eb6eee8bff2f74433ac7909e9afcee04a3"
 "checksum cfg-if 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de1e760d7b6535af4241fca8bd8adf68e2e7edacc6b29f5d399050c5e48cf88c"
-"checksum futures 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0bad0a2ac64b227fdc10c254051ae5af542cf19c9328704fd4092f7914196897"
+"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
+"checksum futures 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "177a82a61dd7e528022ce97f24e54b499dd2fee4d4646a0f283c5fb500dbfe20"
 "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
 "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
 "checksum libc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)" = "a51822fc847e7a8101514d1d44e354ba2ffa7d4c194dcab48870740e327cac70"
 "checksum log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ab83497bf8bf4ed2a74259c1c802351fcd67a65baa86394b6ba73c36f4838054"
-"checksum mio 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "410a1a0ff76f5a226f1e4e3ff1756128e65cd30166e39c3892283e2ac09d5b67"
-"checksum miow 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d5bfc6782530ac8ace97af10a540054a37126b63b0702ddaaa243b73b5745b9a"
+"checksum memchr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "d8b629fb514376c675b98c1421e80b151d3817ac42d7c667717d282761418d20"
+"checksum mio 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5b493dc9fd96bd2077f2117f178172b0765db4dfda3ea4d8000401e6d65d3e80"
+"checksum miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3e690c5df6b2f60acd45d56378981e827ff8295562fc8d34f573deb267a59cd1"
 "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
 "checksum nix 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a0d95c5fa8b641c10ad0b8887454ebaafa3c92b5cd5350f8fc693adafd178e7b"
 "checksum rand 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "022e0636ec2519ddae48154b028864bdce4eaf7d35226ab8e65c611be97b189d"
+"checksum regex 0.1.80 (registry+https://github.com/rust-lang/crates.io-index)" = "4fd4ace6a8cf7860714a2c2280d6c1f7e6a413486c13298bbc86fd3da019402f"
+"checksum regex-syntax 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "f9ec002c35e86791825ed294b50008eea9ddfc8def4420124fbc6b08db834957"
 "checksum rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "237546c689f20bb44980270c73c3b9edd0891c1be49cc1274406134a66d3957b"
 "checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084"
 "checksum scoped-tls 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f417c22df063e9450888a7561788e9bd46d3bb3c1466435b4eccb903807f147d"
 "checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac"
 "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
-"checksum tokio-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "06f40e15561569e24dab3dcf270c0bb950195b84dbed591dfb6591e28c9b9cff"
+"checksum thread-id 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a9539db560102d1cef46b8b78ce737ff0bb64e7e18d35b2a5688f7d097d0ff03"
+"checksum thread_local 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "8576dbbfcaef9641452d5cf0df9b0e7eeab7694956dd33bb61515fb8f18cfdd5"
+"checksum tokio-core 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "52416b3e937abac22a543a7f1c66bd37feb60137ff1ab42390fa02df85347e58"
+"checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
 "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
 "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
 "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
--- a/rust/Cargo.toml	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/Cargo.toml	Wed Jan 04 17:18:44 2017 +0800
@@ -4,7 +4,12 @@
 authors = ["Matt Johnston <[email protected]>"]
 
 [dependencies]
-futures = "0.1.6"
-tokio-core = "0.1.1"
+futures = "0.1"
+tokio-core = "0.1"
 rustc-serialize = "0.3"
 rand = "0.3"
+log = "0.3"
+env_logger = "0.3"
+
+[features]
+testmode = []
--- a/rust/src/fridge.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/fridge.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -1,65 +1,131 @@
+extern crate futures;
+extern crate tokio_core;
+
+use std;
 use std::io;
+use std::mem;
 use std::time::Duration;
 
-use futures::Future;
-use tokio_core::reactor::Timeout;
-use tokio_core::reactor::Handle;
+use futures::{Future,future,Sink,Stream};
+use tokio_core::reactor::{Timeout,Handle};
+use futures::sync::{mpsc};
 
 use types::*;
 
+#[derive(Debug)]
+pub enum Message {
+    Sensor {wort: Option<f32>, fridge: Option<f32>},
+    Params (Params),
+    Tick(u64),
+}
+
 pub struct Fridge {
     params: Params,
     temp_wort: Option<f32>,
     temp_fridge: Option<f32>,
 
-    // timeouts to wake ourself up again
-    //overshoot_timeout: Option<Future<Item=(), Error=io::Error>>,
-    //fridgeoff_timeout: Option<Future<Item=(), Error=io::Error>>,
-    wortvalid_timeout: Option<Timeout>,
+    // Timeouts to wake ourselves up again
+    handle: Handle,
+    timeout_s: mpsc::Sender<u64>,
+    timeout_r: Option<mpsc::Receiver<u64>>,
+    ticker: u64,
+}
+
+impl Sink for Fridge {
+
+    type SinkItem = Message;
+    type SinkError = std::io::Error;
+
+    fn start_send(&mut self, msg: Message)
+            -> futures::StartSend<Self::SinkItem, Self::SinkError> {
+        self.process_msg(msg);
+        Ok(futures::AsyncSink::Ready)
+    }
+
+    fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
+        Ok(futures::Async::Ready(()))
+    }
 }
 
 impl Fridge {
-    pub fn new(p: Params) -> Fridge {
-        Fridge { 
+    pub fn new(p: Params, handle: &Handle) -> Fridge {
+        let (s, r) = mpsc::channel(1);
+        let mut f = Fridge { 
             params: p,
             temp_wort: None,
             temp_fridge: None,
-            //overshoot_timeout: None,
-            //fridgeoff_timeout: None,
-            wortvalid_timeout: None,
-        }
+
+            handle: handle.clone(),
+            timeout_s: s,
+            timeout_r: Some(r),
+            ticker: 0,
+        };
+        f.tick();
+        f
     }
 
-    fn tick(&mut self, handle: &Handle) {
+    /// Returns a stream of timeouts for fridge, waking when next necessary
+    pub fn timeouts(&mut self)
+            -> Box<Stream<Item=Message, Error=io::Error>> {
+        mem::replace(&mut self.timeout_r, None)
+            .expect("NumberWatcher::timeouts() can only be called once")
+            .map(|v| Message::Tick(v))
+            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel"))
+            .boxed()
+    }
 
+    fn next_wakeup(&self) -> Duration {
+        let millis = 400;
+        let dur = Duration::from_millis(millis);
+        dur
+    }
+
+    fn tick(&mut self) {
+        debug!("tick");
+
+        self.send_next_timeout();
     }
 
-    pub fn set_params(&mut self, handle: &Handle, p: Params) {
-        self.params = p;
-        println!("params {:?}", self.params);
-
-        self.tick(handle);
+    /// Sets the next self-wakeup timeout
+    fn send_next_timeout(&mut self) {
+        let waker = self.timeout_s.clone();
+        let dur = self.next_wakeup();
+        debug!("fridge next wakeup {:?}", dur);
+        self.ticker += 1;
+        let v = self.ticker;
+        let t = Timeout::new(dur, &self.handle).unwrap()
+            .map_err(|_| ())
+            .and_then(move |_| {
+                waker.send(v)
+                    .map_err(|_| ())
+            })
+            .map(|_| ());
+        self.handle.spawn(t);
     }
 
-    pub fn set_temps(&mut self, handle: &Handle, 
-                wort: Option<f32>, fridge: Option<f32>) {
+    fn process_msg(&mut self, msg: Message)
+            -> Box<Future<Item=(), Error=()>> {
+        debug!("process_msg {:?}", msg);
+        match msg {
+            Message::Sensor{wort, fridge} => self.update_sensor(wort, fridge),
+            Message::Params(p) => self.update_params(p),
+            Message::Tick(v) => if v == self.ticker {self.tick()},
+        };
+        future::ok::<(),()>(()).boxed()
+    }
+
+    pub fn update_params(&mut self, p: Params) {
+        self.params = p;
+        println!("fridge set_params {:?}", self.params);
+
+        self.tick();
+    }
+
+    pub fn update_sensor(&mut self, wort: Option<f32>, fridge: Option<f32>) {
         self.temp_wort = wort;
         self.temp_fridge = fridge;
 
-        if let Some(_) = self.temp_wort {
-            // set a new timeout, replacing any existing
-            let dur = Duration::new(10, 0); // XXX
-            let t = Timeout::new(dur, handle).unwrap();
-            /*
-            handle.spawn(t.and_then(|_| {
-                                self.tick(handle);
-                                Ok(())
-                            }).map_err(|x| ()));
-                            */
-            self.wortvalid_timeout = Some(t);
-        }
-
-        self.tick(handle);
+        self.tick();
     }
 
 }
--- a/rust/src/main.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/main.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -1,10 +1,16 @@
 extern crate tokio_core;
 extern crate futures;
 extern crate rustc_serialize;
+#[macro_use]
+extern crate log;
+extern crate env_logger;
+
+use std::io;
 
 use tokio_core::reactor::Core;
-use futures::{Future,Stream};
-use rustc_serialize::json;
+use futures::{Stream,Sink,Future};
+use futures::sync::{mpsc};
+use sensor::Sensor;
 
 mod sensor;
 mod fridge;
@@ -14,34 +20,50 @@
 use types::*;
 
 fn main() {
-    println!("Wort Templog");
+    env_logger::init().unwrap();
 
-    let mut paramh = ParamHolder::new();
-    let mut readings = Readings::new();
-    let mut fridge = fridge::Fridge::new(paramh.p.clone());
+    println!("Wort Templog");
+    debug!("debug log level");
 
     let mut core = Core::new().unwrap();
     let handle = core.handle();
 
-    let s = sensor::Sensor::run(&handle, 400, "sens1".to_string());
-    let w = paramwaiter::ParamWaiter::run(&handle, 3000);
+    let mut paramh = ParamHolder::new();
+    let mut fridge = fridge::Fridge::new(paramh.p, &handle);
+
+    let (sensor_s, sensor_r) = mpsc::channel(1);
+    let sensor_r = sensor_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with sensor_r channel"));
 
-    let h = s.for_each(move |r| {
-        readings.push(r);
-        println!("readings {:?}", readings);
-        Ok(())
+    let sensor_stream = if cfg!(feature = "testmode") {
+        sensor::TestSensor::stream(&handle)
+    } else {
+        sensor::OneWireSensor::stream(&handle)
+    };
+
+    // Send the sensors of interest to the fridge (sensor_s),
+    // while streaming them all to the web sender.
+    let s = sensor_stream.map(|r| {
+        debug!("sensors {:?}", r);
+        let t = sensor_s.clone().send(fridge::Message::Sensor{wort: r.wort(), fridge: r.fridge()})
+                    .map(|_| ())
+                    .map_err(|_| ());
+        handle.spawn(t);
+        r
     });
 
-    let j = w.for_each(move |p| {
-        fridge.set_params(&handle, p.clone());
-        paramh.p = p;
-        Ok(())
-    });
+    let param_stream = paramwaiter::ParamWaiter::stream(&handle);
+    let p = param_stream.map(|p| {
+            fridge::Message::Params(p)
+        });
+
+    let timeouts = fridge.timeouts();
 
-    handle.spawn(h.map_err(|x| ()));
-    handle.spawn(j.map_err(|x| ()));
+    let all_readings = s.for_each(|_| Ok(()));
+    let all_fridge = p.select(timeouts).select(sensor_r).forward(fridge)
+        .map(|_| () );
 
-    let forever = futures::empty::<(),()>();
-    core.run(forever);
+    let all = all_fridge.select(all_readings);
+
+    core.run(all).ok();
 }
 
--- a/rust/src/paramwaiter.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/paramwaiter.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -16,7 +16,6 @@
 impl ParamWaiter {
     fn step(&mut self) -> Params {
         let mut p = Params::defaults();
-        let mut rng = rand::thread_rng();
         p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
         p
     }
@@ -25,10 +24,10 @@
         ParamWaiter {}
     }
 
-    pub fn run(handle: &Handle, rate: u64) -> Box<Stream<Item=Params, Error=io::Error>> {
+    pub fn stream(handle: &Handle) -> Box<Stream<Item=Params, Error=io::Error>> {
         let mut s = ParamWaiter::new();
 
-        let dur = Duration::from_millis(rate);
+        let dur = Duration::from_millis(4000);
         Interval::new(dur, handle).unwrap().map(move |()| {
             s.step()
         }).boxed()
--- a/rust/src/sensor.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/sensor.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -3,37 +3,86 @@
 
 use std::time::Duration;
 use std::io;
+use std::fs::File;
+use std::io::Read;
 
 use tokio_core::reactor::Interval;
 use tokio_core::reactor::Handle;
 use futures::Stream;
 use types::*;
 
-pub struct Sensor {
-    current: f32,
-    suf: String,
+pub trait Sensor {
+    fn stream(handle: &Handle)
+        -> Box<Stream<Item=Readings, Error=io::Error>>;
+}
+
+pub struct OneWireSensor {
 }
 
-impl Sensor {
-    fn step(&mut self) -> Vec<Reading> {
-        let mut r = Vec::new();
-        self.current = self.current + 0.1;
-        r.push(Reading::new("aaa".to_string() + &self.suf, self.current));
-        r.push(Reading::new("b".to_string() + &self.suf, self.current/3.0));
-        r
+impl OneWireSensor {
+    fn new() -> OneWireSensor {
+        OneWireSensor {}
+        // todo
     }
 
-    fn new(suffix: String) -> Self {
-        Sensor { current: 22.0, suf: suffix }
+    fn step(&mut self) -> Readings {
+        let mut r = Readings::new();
+        r.add("ambient", Some(31.2));
+        r.add("wort_todo", Some(8.0));
+        debug!("sensor step {:?}", r);
+        r
     }
+}
 
-    pub fn run(handle: &Handle, rate: u64, suffix: String) -> Box<Stream<Item=Vec<Reading>, Error=io::Error>> {
-        let mut s = Sensor::new(suffix);
+impl Sensor for OneWireSensor {
+    fn stream(handle: &Handle)
+        -> Box<Stream<Item=Readings, Error=io::Error>> {
+        let mut s = OneWireSensor::new();
 
-        let dur = Duration::from_millis(rate);
+        let dur = Duration::from_millis(600);
         Interval::new(dur, handle).unwrap().map(move |()| {
             s.step()
         }).boxed()
     }
 }
 
+pub struct TestSensor {
+}
+
+impl TestSensor {
+    fn step(&mut self) -> Readings {
+        let mut r = Readings::new();
+        r.add("ambient", Some(31.2));
+        r.add("wort", Some(Self::try_read("test_wort.txt").unwrap_or_else(|_| 18.0)));
+        r.add("fridge", Some(Self::try_read("test_fridge.txt").unwrap_or_else(|_| 20.0)));
+        r
+    }
+
+    fn try_read(filename: &str) -> Result<f32, String> {
+        File::open(filename)
+            .map_err(|e| e.to_string())
+            .and_then(|mut f| {
+                let mut s = String::new();
+                f.read_to_string(&mut s)
+                    .map_err(|e| e.to_string())
+                    .map(|_| s)
+            })
+            .and_then(|s| {
+                    s.trim().parse::<f32>()
+                        .map_err(|e| e.to_string())
+            })
+    }
+}
+
+impl Sensor for TestSensor {
+    fn stream(handle: &Handle)
+        -> Box<Stream<Item=Readings, Error=io::Error>> {
+        let mut s = TestSensor {};
+
+        let dur = Duration::new(1,0);
+        Interval::new(dur, handle).unwrap().map(move |()| {
+            s.step()
+        }).boxed()
+    }
+}
+
--- a/rust/src/types.rs	Tue Dec 27 00:51:28 2016 +0800
+++ b/rust/src/types.rs	Wed Jan 04 17:18:44 2017 +0800
@@ -1,3 +1,6 @@
+use std::collections::HashMap;
+use std::time::Duration;
+
 #[derive(RustcDecodable, RustcEncodable, Debug, Clone)]
 pub struct Params {
     pub fridge_setpoint: f32,
@@ -41,42 +44,40 @@
 }
 
 #[derive(Debug)]
-pub struct Reading {
-    name: String,
-    value: Option<f32>,
-}
-
-impl Reading {
-    pub fn new(name: String, value: f32) -> Reading {
-        Reading { name: name, value: Some(value) }
-    }
-    pub fn new_none(name: String) -> Reading {
-        Reading { name: name, value: None }
-    }
-}
-
-#[derive(Debug)]
 pub struct Readings {
-    temps: Vec<Vec<Reading>>,
+    temps: HashMap<String, Option<f32>>,
 }
 
 impl Readings {
     pub fn new() -> Readings {
         Readings {
-            temps: Vec::new(),
+            temps: HashMap::new(),
         }
     }
 
-    pub fn push(&mut self, vals: Vec<Reading>) {
-        self.temps.push(vals);
+    pub fn add(&mut self, name: &str, v: Option<f32>) {
+        if let Some(prev) = self.temps.insert(name.to_string(), v) {
+            warn!("Replaced existing reading '{}' {:?} -> {:?}",
+                name, prev, v);
+        }
     }
 
     pub fn fridge(&self) -> Option<f32> {
-        unimplemented!();
+        if let Some(t) = self.temps.get("fridge") {
+            t.clone()
+        } else {
+            warn!("No fridge reading was added");
+            None
+        }
     }
 
     pub fn wort(&self) -> Option<f32> {
-        unimplemented!();
+        if let Some(t) = self.temps.get("wort") {
+            t.clone()
+        } else {
+            warn!("No wort reading was added");
+            None
+        }
     }
 }