changeset 611:f3e39e2107fd rust

still doesn't compile, improvements to TemplogError and tokio curl though
author Matt Johnston <matt@ucc.asn.au>
date Tue, 28 Feb 2017 22:58:47 +0800
parents af0dac00d40b
children 5fc41e0833b4
files rust/Cargo.lock rust/Cargo.toml rust/src/config.rs rust/src/fridge.rs rust/src/main.rs rust/src/paramwaiter.rs rust/src/sensor.rs rust/src/types.rs
diffstat 8 files changed, 246 insertions(+), 41 deletions(-) [+]
line wrap: on
line diff
--- a/rust/Cargo.lock	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/Cargo.lock	Tue Feb 28 22:58:47 2017 +0800
@@ -14,6 +14,7 @@
  "rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)",
  "serde 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)",
  "serde_derive 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde_json 0.9.8 (registry+https://github.com/rust-lang/crates.io-index)",
  "time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio-core 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio-curl 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -83,6 +84,11 @@
 ]
 
 [[package]]
+name = "dtoa"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "env_logger"
 version = "0.3.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -124,6 +130,11 @@
 ]
 
 [[package]]
+name = "itoa"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "kernel32-sys"
 version = "0.2.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -217,6 +228,11 @@
 ]
 
 [[package]]
+name = "num-traits"
+version = "0.1.36"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "num_cpus"
 version = "1.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -332,6 +348,17 @@
 ]
 
 [[package]]
+name = "serde_json"
+version = "0.9.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "slab"
 version = "0.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -497,11 +524,13 @@
 "checksum curl 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c90e1240ef340dd4027ade439e5c7c2064dd9dc652682117bd50d1486a3add7b"
 "checksum curl-sys 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d909dc402ae80b6f7b0118c039203436061b9d9a3ca5d2c2546d93e0a61aaa"
 "checksum docopt 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ab32ea6e284d87987066f21a9e809a73c14720571ef34516f0890b3d355ccfd8"
+"checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90"
 "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
 "checksum futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c1913eb7083840b1bbcbf9631b7fda55eaf35fe7ead13cca034e8946f9e2bc41"
 "checksum futures-cpupool 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9e48a3fff6a58fe9df1eed13d2599650416a987386c43a19aec656c3e6a2c229"
 "checksum gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)" = "c07c758b972368e703a562686adb39125707cc1ef3399da8c019fc6c2498a75d"
 "checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518"
+"checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c"
 "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
 "checksum lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6abe0ee2e758cd6bc8a2cd56726359007748fbf4128da998b65d0b70f881e19b"
 "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b"
@@ -513,6 +542,7 @@
 "checksum mio 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "eecdbdd49a849336e77b453f021c89972a2cfb5b51931a0026ae0ac4602de681"
 "checksum miow 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3a78d2605eb97302c10cf944b8d96b0a2a890c52957caf92fcd1f24f69049579"
 "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2"
+"checksum num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a16a42856a256b39c6d3484f097f6713e14feacd9bfb02290917904fae46c81c"
 "checksum num_cpus 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a225d1e2717567599c24f88e49f00856c6e825a12125181ee42c4257e3688d39"
 "checksum openssl-probe 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "756d49c8424483a3df3b5d735112b4da22109ced9a8294f1f5cdf80fb3810919"
 "checksum openssl-sys 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5dd48381e9e8a6dce9c4c402db143b2e243f5f872354532f7a009c289b3998ca"
@@ -529,6 +559,7 @@
 "checksum serde 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1e0ed773960f90a78567fcfbe935284adf50c5d7cf119aa2cf43bb0b4afa69bb"
 "checksum serde_codegen_internals 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c3172bf2940b975c0e4f6ab42a511c0a4407d4f46ccef87a9d3615db5c26fa96"
 "checksum serde_derive 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)" = "6af30425c5161deb200aac4803c62b903eb3be7e889c5823d0e16c4ce0ce989c"
+"checksum serde_json 0.9.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6501ac6f8b74f9b1033f7ddf79a08edfa0f58d6f8e3190cb8dc97736afa257a8"
 "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
 "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694"
 "checksum syn 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f94368aae82bb29656c98443a7026ca931a659e8d19dcdc41d6e273054e820"
--- a/rust/Cargo.toml	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/Cargo.toml	Tue Feb 28 22:58:47 2017 +0800
@@ -20,3 +20,5 @@
 tokio-curl = "0.1"
 toml = "0.3"
 curl = "0.4"
+serde_json = "0.9"
+
--- a/rust/src/config.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/config.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -5,6 +5,8 @@
 use std::io::Read;
 use serde::{Serialize,Deserialize,Deserializer,Serializer};
 
+use types::*;
+
 #[derive(Deserialize,Serialize,Debug,Clone)]
 #[allow(non_snake_case)]
 pub struct Config {
@@ -69,7 +71,7 @@
         toml::to_string(self).unwrap()
     }
 
-    pub fn merge(&self, conf: &str) -> Result<Self, Box<Error>> {
+    pub fn merge(&self, conf: &str) -> Result<Self, TemplogError> {
         // convert existing and new toml into tables, combine them.
         let mut new_toml = toml::from_str(conf)?;
         let mut ex_val = toml::Value::try_from(self).unwrap();
@@ -80,7 +82,7 @@
         Ok(ret)
     }
 
-    pub fn merge_file(&self, filename: &str) -> Result<Self, Box<Error>> {
+    pub fn merge_file(&self, filename: &str) -> Result<Self, TemplogError> {
 
         let mut s = String::new();
         File::open(filename)?.read_to_string(&mut s)?;
--- a/rust/src/fridge.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/fridge.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -4,6 +4,7 @@
 use std;
 use std::io;
 use std::mem;
+use std::error::Error;
 use std::time::{Duration,Instant};
 
 use futures::{Future,future,Sink,Stream};
@@ -38,7 +39,7 @@
 impl Sink for Fridge {
 
     type SinkItem = Message;
-    type SinkError = std::io::Error;
+    type SinkError = TemplogError;
 
     fn start_send(&mut self, msg: Message)
             -> futures::StartSend<Self::SinkItem, Self::SinkError> {
@@ -89,11 +90,11 @@
     ///  * invalid wort timeout
     /// All specified in next_wakeup()
     pub fn wakeups(&mut self)
-            -> Box<Stream<Item=Message, Error=io::Error>> {
+            -> Box<Stream<Item=Message, Error=TemplogError>> {
         mem::replace(&mut self.timeout_r, None)
             .expect("Fridge::wakeups() can only be called once")
             .map(|v| Message::Tick(v))
-            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel"))
+            .map_err(|e| TemplogError::new("wakeups() receive failed"))
             .boxed()
     }
 
--- a/rust/src/main.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/main.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -7,6 +7,7 @@
 extern crate time;
 extern crate tokio_curl;
 extern crate curl;
+extern crate serde_json;
 
 #[macro_use] 
 extern crate lazy_static;
@@ -44,7 +45,7 @@
     let mut fridge = fridge::Fridge::new(&config, nowait, paramh.p, &handle);
 
     let (fridge_reading_s, fridge_reading_r) = mpsc::channel(1);
-    let fridge_reading_r = fridge_reading_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with fridge_reading_r channel"));
+    let fridge_reading_r = fridge_reading_r.map_err(|e| TemplogError::new("Problem with fridge_reading_r channel"));
 
     let sensor_stream = if testmode {
         sensor::TestSensor::new(config).stream(&handle)
@@ -70,7 +71,7 @@
         r
     });
 
-    let param_stream = paramwaiter::ParamWaiter::stream(&handle);
+    let param_stream = paramwaiter::ParamWaiter::new(config, &handle).stream();
     let p = param_stream.map(|p| {
             fridge::Message::Params(p)
         });
--- a/rust/src/paramwaiter.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/paramwaiter.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -1,65 +1,113 @@
 extern crate tokio_core;
 extern crate futures;
 extern crate rand;
+extern crate serde_json;
 
 use std::time::Duration;
 use std::io;
+use std::str;
+use std::rc::Rc;
+use std::sync::{Arc,Mutex};
+use std::error::Error;
 
 use tokio_core::reactor::Interval;
 use tokio_core::reactor::Handle;
 use tokio_curl::Session;
 use futures::{Stream,Future,future};
+use curl::easy::Easy;
+
 use types::*;
-use curl::Easy;
 use ::Config;
 
+#[derive(Clone)]
 pub struct ParamWaiter {
     limitlog: NotTooOften,
     epoch_tag: String,
     session: Option<Session>,
     config: Config,
+    handle: Handle,
 }
 
-const LOGMINUTES: u64 = 15;
+const LOG_MINUTES: u64 = 15;
+const MAX_RESPONSE_SIZE: usize = 10000;
 
 impl ParamWaiter {
-    pub fn new(config: &Config) -> Self {
+    pub fn new(config: &Config, handle: &Handle) -> Self {
         ParamWaiter {
-            limitlog: NotTooOften::new(LOGMINUTES*60),
+            limitlog: NotTooOften::new(LOG_MINUTES*60),
             epoch_tag: String::new(),
             session: None,
             config: config.clone(),
+            handle: handle.clone(),
         }
     }
 
     fn make_req(&self) -> Easy {
         let mut req = Easy::new();
         req.get(true).unwrap();
-        req.url(config.SETTINGS_URL);
+        // supposedly req.url won't fail, checking is later?
+        req.url(&self.config.SETTINGS_URL).unwrap();
+        req
     }
 
-    fn step(&mut self, handle: &Handle) -> Box<Future<Item=Option<Params>, Error=io::Error>> {
+    fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
+        let text = String::from_utf8_lossy(buf).to_string();
+        let resp = req.response_code()?;
+        match resp {
+            200 => Ok(Some(serde_json::from_str(&text)?)), // new params
+            304 => Ok(None), // unmodified (long polling timeout at the server)
+            _ => {
+                Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
+            },
+        }
+    }
+
+    fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
         if self.session.is_none() {
-            self.session = Some(Session::new(handle.clone()))
+            self.session = Some(Session::new(self.handle.clone()))
         }
 
-        let req = self.make_req();
-        /*
-        self.session.unwrap().perform(self.make_req())
-            .and_then(||)
-            */
+        let mut req = self.make_req();
+        let buf = Arc::new(Mutex::new(Vec::new()));
 
+        let dst = buf.clone();
+        req.write_function(move |data| {
+            let mut dst = dst.lock().unwrap();
+            dst.extend_from_slice(data);
+            if dst.len() > MAX_RESPONSE_SIZE {
+                debug!("Too large params response from server: {}", dst.len());
+                Ok(0)
+            } else {
+                Ok(data.len())
+            }
+        }).unwrap();
+
+        // XXX too many clones
+        let se = self.clone();
+        let s = self.clone().session.unwrap().perform(req)
+            .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
+            .and_then(move |mut es| {
+                let result = buf.lock().unwrap();
+                se.handle_response(&mut es, &result)
+            });
+        Box::new(s)
+
+            /*
         let mut p = Params::defaults();
         p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
-        future::ok(p).boxed()
+        future::ok(Some(p)).boxed()
+        */
     }
 
-    pub fn stream(&mut self, handle: &Handle) -> Box<Stream<Item=Params, Error=io::Error>> {
+    pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> {
 
         let dur = Duration::from_millis(4000);
-        let i = Interval::new(dur, handle).unwrap()
+        let mut s = Rc::new(self.clone());
+        let i = Interval::new(dur, &self.handle).unwrap()
+            .map_err(|e| TemplogError::new_io("interval failed", e))
             .and_then(move |()| {
-                s.step()
+                let ss = Rc::get_mut(&mut s).unwrap();
+                ss.step()
             })
             // throw away None params
             .filter_map(|p| p);
--- a/rust/src/sensor.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/sensor.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -19,8 +19,7 @@
 use ::Config;
 
 pub trait Sensor {
-    fn stream(&self, handle: &Handle)
-        -> Box<Stream<Item=Readings, Error=io::Error>>;
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>>;
 }
 
 #[derive(Clone)]
@@ -51,7 +50,7 @@
         r
     }
 
-    fn read_sensor(&self, n: &str) -> Result<f32, Box<Error>> {
+    fn read_sensor(&self, n: &str) -> Result<f32, TemplogError> {
         lazy_static! {
             // multiline
             static ref THERM_RE: regex::Regex = regex::Regex::new("(?m).* YES\n.*t=(.*)\n").unwrap();
@@ -71,7 +70,7 @@
         Ok(f32::from_str(v)?)
     }
 
-    fn sensor_names(&self) -> Result<Vec<String>, Box<Error>> {
+    fn sensor_names(&self) -> Result<Vec<String>, TemplogError> {
         // TODO: needs to handle multiple busses.
         let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR);
         path.push("w1_master_slaves");
@@ -84,17 +83,18 @@
 
 impl Sensor for OneWireSensor {
 
-    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=io::Error>> {
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
         let pool = futures_cpupool::CpuPool::new(4); // TODO: how many?
 
         let dur = Duration::new(self.config.SENSOR_SLEEP,0);
         let s = Arc::new(self.clone());
-        Interval::new(dur, handle).unwrap()
+        let i = Interval::new(dur, handle).unwrap()
+            .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
             .and_then(move |()| {
                 let a = s.clone();
                 pool.spawn_fn(move || Ok(a.step()))
-            })
-            .boxed()
+            });
+        consume_errors(i)
     }
 }
 
@@ -118,7 +118,7 @@
         r
     }
 
-    fn try_read(filename: &str) -> Result<f32, Box<Error>> {
+    fn try_read(filename: &str) -> Result<f32, TemplogError> {
         let mut s = String::new();
         File::open(filename)?.read_to_string(&mut s)?;
         Ok(s.trim().parse::<f32>()?)
@@ -126,14 +126,15 @@
 }
 
 impl Sensor for TestSensor {
-    fn stream(&self, handle: &Handle)
-        -> Box<Stream<Item=Readings, Error=io::Error>> {
+    fn stream(&self, handle: &Handle) -> Box<Stream<Item=Readings, Error=TemplogError>> {
 
         let dur = Duration::new(self.config.SENSOR_SLEEP,0);
-        Interval::new(dur, handle).unwrap()
+        let i = Interval::new(dur, handle).unwrap()
+            .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e))
             .and_then(move |()| {
                 Ok(Self::test_step())
-            }).boxed()
+            });
+        consume_errors(i)
     }
 }
 
--- a/rust/src/types.rs	Thu Feb 23 23:27:09 2017 +0800
+++ b/rust/src/types.rs	Tue Feb 28 22:58:47 2017 +0800
@@ -2,11 +2,17 @@
 use std::time::{Duration,Instant};
 use std::error::Error;
 use std::fmt;
+use std::io;
 use std::cmp;
 use std::cell::Cell;
 use std::iter::FromIterator;
+use std;
 
+use futures::{Stream,IntoFuture};
 use serde::{Deserialize,Serialize};
+use toml;
+use curl;
+use serde_json;
 
 #[derive(Deserialize, Serialize, Debug)]
 pub struct Params {
@@ -77,34 +83,128 @@
 }
 
 #[derive(Debug)]
+pub enum TemplogErrorKind {
+    None,
+    Io(io::Error),
+    ParseFloat(std::num::ParseFloatError),
+    TomlDe(toml::de::Error),
+    SerdeJson(serde_json::Error),
+    Curl(curl::Error),
+}
+
+#[derive(Debug)]
 pub struct TemplogError {
     desc: String,
+    kind: TemplogErrorKind,
 }
 
 impl Error for TemplogError {
-    fn description(&self) -> &str { &self.desc }
-    fn cause(&self) -> Option<&Error> { None }
+    fn description(&self) -> &str { 
+        &format!("{}", self)
+    }
+
+    fn cause(&self) -> Option<&Error> { 
+        match self.kind {
+            TemplogErrorKind::None => None,
+            TemplogErrorKind::Io(e) => Some(&e),
+            TemplogErrorKind::ParseFloat(e) => Some(&e),
+            TemplogErrorKind::TomlDe(e) => Some(&e),
+            TemplogErrorKind::SerdeJson(e) => Some(&e),
+            TemplogErrorKind::Curl(e) => Some(&e),
+        }
+    }
 }
 
 impl fmt::Display for TemplogError {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "TemplogError: {}", self.desc);
+        match self.kind {
+            TemplogErrorKind::None => write!(f, "Templog Error: {}", self.desc),
+            TemplogErrorKind::Io(e) => write!(f, "Templog IO error {}: {}", self.desc, e),
+            TemplogErrorKind::TomlDe(e) => write!(f, "Templog toml error {}: {}", self.desc, e),
+            TemplogErrorKind::SerdeJson(e) => write!(f, "Json decode error {}: {}", self.desc, e),
+            TemplogErrorKind::ParseFloat(e) => write!(f, "Templog parse error {}: {}", self.desc, e),
+            TemplogErrorKind::Curl(e) => write!(f, "Templog curl http error {}: {}", self.desc, e),
+        };
         Ok(())
     }
-
-
 }
 
 impl TemplogError {
     pub fn new(desc: &str) -> Self {
         TemplogError { 
             desc: desc.to_string(),
+            kind: TemplogErrorKind::None,
+        }
+    }
+
+    pub fn new_io(desc: &str, e: io::Error) -> Self {
+        TemplogError { 
+            desc: desc.to_string(),
+            kind: TemplogErrorKind::Io(e),
+        }
+    }
+
+    pub fn new_toml_de(desc: &str, e: toml::de::Error) -> Self {
+        TemplogError { 
+            desc: desc.to_string(),
+            kind: TemplogErrorKind::TomlDe(e),
+        }
+    }
+
+    pub fn new_parse_float(desc: &str, e: std::num::ParseFloatError) -> Self {
+        TemplogError { 
+            desc: desc.to_string(),
+            kind: TemplogErrorKind::ParseFloat(e),
+        }
+    }
+
+    pub fn new_curl(desc: &str, e: curl::Error) -> Self {
+        TemplogError { 
+            desc: desc.to_string(),
+            kind: TemplogErrorKind::Curl(e),
+        }
+    }
+
+    pub fn new_serde_json(desc: &str, e: serde_json::Error) -> Self {
+        TemplogError { 
+            desc: desc.to_string(),
+            kind: TemplogErrorKind::SerdeJson(e),
         }
     }
 }
 
+impl From<io::Error> for TemplogError {
+    fn from(e: io::Error) -> Self {
+        TemplogError::new_io("", e)
+    }
+}
+
+impl From<toml::de::Error> for TemplogError {
+    fn from(e: toml::de::Error) -> Self {
+        TemplogError::new_toml_de("", e)
+    }
+}
+
+impl From<std::num::ParseFloatError> for TemplogError {
+    fn from(e: std::num::ParseFloatError) -> Self {
+        TemplogError::new_parse_float("", e)
+    }
+}
+
+impl From<curl::Error> for TemplogError {
+    fn from(e: curl::Error) -> Self {
+        TemplogError::new_curl("", e)
+    }
+}
+
+impl From<serde_json::Error> for TemplogError {
+    fn from(e: serde_json::Error) -> Self {
+        TemplogError::new_serde_json("", e)
+    }
+}
 
 /// Call closures with a rate limit. Useful for log message ratelimiting
+#[derive(Clone)]
 pub struct NotTooOften {
     last: Cell<Instant>,
     limit: Duration,
@@ -200,3 +300,22 @@
     }
 }
 
+/// Takes a stream and returns a stream without errors.
+pub fn consume_errors<S>(s: S) -> Box<Stream<Item=S::Item, Error=S::Error>> 
+// XXX not sure why 'static is really necessary here?
+    where 
+    S: Stream+Send+'static, 
+    <S as Stream>::Error: std::fmt::Display+Send+'static,
+    <S as Stream>::Item: Send+'static,
+    {
+    s.then(|r| {
+        match r {
+            Ok(v) => Ok(Some(v)),
+            Err(e) => {
+                debug!("Stream error: {}", e);
+                Ok(None)
+            }
+        }
+    })
+    .filter_map(|p| p).boxed()
+}