# HG changeset patch # User Matt Johnston # Date 1488293927 -28800 # Node ID f3e39e2107fd8ccf3906cc18b2bc8a326094a788 # Parent af0dac00d40b30dbf73eb02eb057c625f176758b still doesn't compile, improvements to TemplogError and tokio curl though diff -r af0dac00d40b -r f3e39e2107fd rust/Cargo.lock --- a/rust/Cargo.lock Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/Cargo.lock Tue Feb 28 22:58:47 2017 +0800 @@ -14,6 +14,7 @@ "rustc-serialize 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "serde 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 0.9.8 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-curl 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -83,6 +84,11 @@ ] [[package]] +name = "dtoa" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "env_logger" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -124,6 +130,11 @@ ] [[package]] +name = "itoa" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "kernel32-sys" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -217,6 +228,11 @@ ] [[package]] +name = "num-traits" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "num_cpus" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -332,6 +348,17 @@ ] [[package]] +name = "serde_json" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "slab" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -497,11 +524,13 @@ "checksum curl 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c90e1240ef340dd4027ade439e5c7c2064dd9dc652682117bd50d1486a3add7b" "checksum curl-sys 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d909dc402ae80b6f7b0118c039203436061b9d9a3ca5d2c2546d93e0a61aaa" "checksum docopt 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ab32ea6e284d87987066f21a9e809a73c14720571ef34516f0890b3d355ccfd8" +"checksum dtoa 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "80c8b71fd71146990a9742fc06dcbbde19161a267e0ad4e572c35162f4578c90" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum futures 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "c1913eb7083840b1bbcbf9631b7fda55eaf35fe7ead13cca034e8946f9e2bc41" "checksum futures-cpupool 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9e48a3fff6a58fe9df1eed13d2599650416a987386c43a19aec656c3e6a2c229" "checksum gcc 0.3.43 (registry+https://github.com/rust-lang/crates.io-index)" = "c07c758b972368e703a562686adb39125707cc1ef3399da8c019fc6c2498a75d" "checksum gdi32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0912515a8ff24ba900422ecda800b52f4016a56251922d397c576bf92c690518" +"checksum itoa 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "eb2f404fbc66fd9aac13e998248505e7ecb2ad8e44ab6388684c5fb11c6c251c" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" "checksum lazy_static 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6abe0ee2e758cd6bc8a2cd56726359007748fbf4128da998b65d0b70f881e19b" "checksum lazycell 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce12306c4739d86ee97c23139f3a34ddf0387bbf181bc7929d287025a8c3ef6b" @@ -513,6 +542,7 @@ "checksum mio 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "eecdbdd49a849336e77b453f021c89972a2cfb5b51931a0026ae0ac4602de681" "checksum miow 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3a78d2605eb97302c10cf944b8d96b0a2a890c52957caf92fcd1f24f69049579" "checksum net2 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "5edf9cb6be97212423aed9413dd4729d62b370b5e1c571750e882cebbbc1e3e2" +"checksum num-traits 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a16a42856a256b39c6d3484f097f6713e14feacd9bfb02290917904fae46c81c" "checksum num_cpus 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a225d1e2717567599c24f88e49f00856c6e825a12125181ee42c4257e3688d39" "checksum openssl-probe 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "756d49c8424483a3df3b5d735112b4da22109ced9a8294f1f5cdf80fb3810919" "checksum openssl-sys 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5dd48381e9e8a6dce9c4c402db143b2e243f5f872354532f7a009c289b3998ca" @@ -529,6 +559,7 @@ "checksum serde 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1e0ed773960f90a78567fcfbe935284adf50c5d7cf119aa2cf43bb0b4afa69bb" "checksum serde_codegen_internals 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c3172bf2940b975c0e4f6ab42a511c0a4407d4f46ccef87a9d3615db5c26fa96" "checksum serde_derive 0.9.7 (registry+https://github.com/rust-lang/crates.io-index)" = "6af30425c5161deb200aac4803c62b903eb3be7e889c5823d0e16c4ce0ce989c" +"checksum serde_json 0.9.8 (registry+https://github.com/rust-lang/crates.io-index)" = "6501ac6f8b74f9b1033f7ddf79a08edfa0f58d6f8e3190cb8dc97736afa257a8" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" "checksum syn 0.11.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f4f94368aae82bb29656c98443a7026ca931a659e8d19dcdc41d6e273054e820" diff -r af0dac00d40b -r f3e39e2107fd rust/Cargo.toml --- a/rust/Cargo.toml Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/Cargo.toml Tue Feb 28 22:58:47 2017 +0800 @@ -20,3 +20,5 @@ tokio-curl = "0.1" toml = "0.3" curl = "0.4" +serde_json = "0.9" + diff -r af0dac00d40b -r f3e39e2107fd rust/src/config.rs --- a/rust/src/config.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/config.rs Tue Feb 28 22:58:47 2017 +0800 @@ -5,6 +5,8 @@ use std::io::Read; use serde::{Serialize,Deserialize,Deserializer,Serializer}; +use types::*; + #[derive(Deserialize,Serialize,Debug,Clone)] #[allow(non_snake_case)] pub struct Config { @@ -69,7 +71,7 @@ toml::to_string(self).unwrap() } - pub fn merge(&self, conf: &str) -> Result> { + pub fn merge(&self, conf: &str) -> Result { // convert existing and new toml into tables, combine them. let mut new_toml = toml::from_str(conf)?; let mut ex_val = toml::Value::try_from(self).unwrap(); @@ -80,7 +82,7 @@ Ok(ret) } - pub fn merge_file(&self, filename: &str) -> Result> { + pub fn merge_file(&self, filename: &str) -> Result { let mut s = String::new(); File::open(filename)?.read_to_string(&mut s)?; diff -r af0dac00d40b -r f3e39e2107fd rust/src/fridge.rs --- a/rust/src/fridge.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/fridge.rs Tue Feb 28 22:58:47 2017 +0800 @@ -4,6 +4,7 @@ use std; use std::io; use std::mem; +use std::error::Error; use std::time::{Duration,Instant}; use futures::{Future,future,Sink,Stream}; @@ -38,7 +39,7 @@ impl Sink for Fridge { type SinkItem = Message; - type SinkError = std::io::Error; + type SinkError = TemplogError; fn start_send(&mut self, msg: Message) -> futures::StartSend { @@ -89,11 +90,11 @@ /// * invalid wort timeout /// All specified in next_wakeup() pub fn wakeups(&mut self) - -> Box> { + -> Box> { mem::replace(&mut self.timeout_r, None) .expect("Fridge::wakeups() can only be called once") .map(|v| Message::Tick(v)) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "Something wrong with watcher timeout channel")) + .map_err(|e| TemplogError::new("wakeups() receive failed")) .boxed() } diff -r af0dac00d40b -r f3e39e2107fd rust/src/main.rs --- a/rust/src/main.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/main.rs Tue Feb 28 22:58:47 2017 +0800 @@ -7,6 +7,7 @@ extern crate time; extern crate tokio_curl; extern crate curl; +extern crate serde_json; #[macro_use] extern crate lazy_static; @@ -44,7 +45,7 @@ let mut fridge = fridge::Fridge::new(&config, nowait, paramh.p, &handle); let (fridge_reading_s, fridge_reading_r) = mpsc::channel(1); - let fridge_reading_r = fridge_reading_r.map_err(|_| io::Error::new(io::ErrorKind::Other, "Problem with fridge_reading_r channel")); + let fridge_reading_r = fridge_reading_r.map_err(|e| TemplogError::new("Problem with fridge_reading_r channel")); let sensor_stream = if testmode { sensor::TestSensor::new(config).stream(&handle) @@ -70,7 +71,7 @@ r }); - let param_stream = paramwaiter::ParamWaiter::stream(&handle); + let param_stream = paramwaiter::ParamWaiter::new(config, &handle).stream(); let p = param_stream.map(|p| { fridge::Message::Params(p) }); diff -r af0dac00d40b -r f3e39e2107fd rust/src/paramwaiter.rs --- a/rust/src/paramwaiter.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/paramwaiter.rs Tue Feb 28 22:58:47 2017 +0800 @@ -1,65 +1,113 @@ extern crate tokio_core; extern crate futures; extern crate rand; +extern crate serde_json; use std::time::Duration; use std::io; +use std::str; +use std::rc::Rc; +use std::sync::{Arc,Mutex}; +use std::error::Error; use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; use tokio_curl::Session; use futures::{Stream,Future,future}; +use curl::easy::Easy; + use types::*; -use curl::Easy; use ::Config; +#[derive(Clone)] pub struct ParamWaiter { limitlog: NotTooOften, epoch_tag: String, session: Option, config: Config, + handle: Handle, } -const LOGMINUTES: u64 = 15; +const LOG_MINUTES: u64 = 15; +const MAX_RESPONSE_SIZE: usize = 10000; impl ParamWaiter { - pub fn new(config: &Config) -> Self { + pub fn new(config: &Config, handle: &Handle) -> Self { ParamWaiter { - limitlog: NotTooOften::new(LOGMINUTES*60), + limitlog: NotTooOften::new(LOG_MINUTES*60), epoch_tag: String::new(), session: None, config: config.clone(), + handle: handle.clone(), } } fn make_req(&self) -> Easy { let mut req = Easy::new(); req.get(true).unwrap(); - req.url(config.SETTINGS_URL); + // supposedly req.url won't fail, checking is later? + req.url(&self.config.SETTINGS_URL).unwrap(); + req } - fn step(&mut self, handle: &Handle) -> Box, Error=io::Error>> { + fn handle_response(&self, req: &mut Easy, buf: &Vec) -> Result, TemplogError> { + let text = String::from_utf8_lossy(buf).to_string(); + let resp = req.response_code()?; + match resp { + 200 => Ok(Some(serde_json::from_str(&text)?)), // new params + 304 => Ok(None), // unmodified (long polling timeout at the server) + _ => { + Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) + }, + } + } + + fn step(&mut self) -> Box, Error=TemplogError>> { if self.session.is_none() { - self.session = Some(Session::new(handle.clone())) + self.session = Some(Session::new(self.handle.clone())) } - let req = self.make_req(); - /* - self.session.unwrap().perform(self.make_req()) - .and_then(||) - */ + let mut req = self.make_req(); + let buf = Arc::new(Mutex::new(Vec::new())); + let dst = buf.clone(); + req.write_function(move |data| { + let mut dst = dst.lock().unwrap(); + dst.extend_from_slice(data); + if dst.len() > MAX_RESPONSE_SIZE { + debug!("Too large params response from server: {}", dst.len()); + Ok(0) + } else { + Ok(data.len()) + } + }).unwrap(); + + // XXX too many clones + let se = self.clone(); + let s = self.clone().session.unwrap().perform(req) + .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) + .and_then(move |mut es| { + let result = buf.lock().unwrap(); + se.handle_response(&mut es, &result) + }); + Box::new(s) + + /* let mut p = Params::defaults(); p.fridge_setpoint = 17.0 + 4.0*rand::random::(); - future::ok(p).boxed() + future::ok(Some(p)).boxed() + */ } - pub fn stream(&mut self, handle: &Handle) -> Box> { + pub fn stream(&mut self) -> Box> { let dur = Duration::from_millis(4000); - let i = Interval::new(dur, handle).unwrap() + let mut s = Rc::new(self.clone()); + let i = Interval::new(dur, &self.handle).unwrap() + .map_err(|e| TemplogError::new_io("interval failed", e)) .and_then(move |()| { - s.step() + let ss = Rc::get_mut(&mut s).unwrap(); + ss.step() }) // throw away None params .filter_map(|p| p); diff -r af0dac00d40b -r f3e39e2107fd rust/src/sensor.rs --- a/rust/src/sensor.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/sensor.rs Tue Feb 28 22:58:47 2017 +0800 @@ -19,8 +19,7 @@ use ::Config; pub trait Sensor { - fn stream(&self, handle: &Handle) - -> Box>; + fn stream(&self, handle: &Handle) -> Box>; } #[derive(Clone)] @@ -51,7 +50,7 @@ r } - fn read_sensor(&self, n: &str) -> Result> { + fn read_sensor(&self, n: &str) -> Result { lazy_static! { // multiline static ref THERM_RE: regex::Regex = regex::Regex::new("(?m).* YES\n.*t=(.*)\n").unwrap(); @@ -71,7 +70,7 @@ Ok(f32::from_str(v)?) } - fn sensor_names(&self) -> Result, Box> { + fn sensor_names(&self) -> Result, TemplogError> { // TODO: needs to handle multiple busses. let mut path = PathBuf::from(&self.config.SENSOR_BASE_DIR); path.push("w1_master_slaves"); @@ -84,17 +83,18 @@ impl Sensor for OneWireSensor { - fn stream(&self, handle: &Handle) -> Box> { + fn stream(&self, handle: &Handle) -> Box> { let pool = futures_cpupool::CpuPool::new(4); // TODO: how many? let dur = Duration::new(self.config.SENSOR_SLEEP,0); let s = Arc::new(self.clone()); - Interval::new(dur, handle).unwrap() + let i = Interval::new(dur, handle).unwrap() + .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e)) .and_then(move |()| { let a = s.clone(); pool.spawn_fn(move || Ok(a.step())) - }) - .boxed() + }); + consume_errors(i) } } @@ -118,7 +118,7 @@ r } - fn try_read(filename: &str) -> Result> { + fn try_read(filename: &str) -> Result { let mut s = String::new(); File::open(filename)?.read_to_string(&mut s)?; Ok(s.trim().parse::()?) @@ -126,14 +126,15 @@ } impl Sensor for TestSensor { - fn stream(&self, handle: &Handle) - -> Box> { + fn stream(&self, handle: &Handle) -> Box> { let dur = Duration::new(self.config.SENSOR_SLEEP,0); - Interval::new(dur, handle).unwrap() + let i = Interval::new(dur, handle).unwrap() + .map_err(|e| TemplogError::new_io("Interval failed. Should not happen", e)) .and_then(move |()| { Ok(Self::test_step()) - }).boxed() + }); + consume_errors(i) } } diff -r af0dac00d40b -r f3e39e2107fd rust/src/types.rs --- a/rust/src/types.rs Thu Feb 23 23:27:09 2017 +0800 +++ b/rust/src/types.rs Tue Feb 28 22:58:47 2017 +0800 @@ -2,11 +2,17 @@ use std::time::{Duration,Instant}; use std::error::Error; use std::fmt; +use std::io; use std::cmp; use std::cell::Cell; use std::iter::FromIterator; +use std; +use futures::{Stream,IntoFuture}; use serde::{Deserialize,Serialize}; +use toml; +use curl; +use serde_json; #[derive(Deserialize, Serialize, Debug)] pub struct Params { @@ -77,34 +83,128 @@ } #[derive(Debug)] +pub enum TemplogErrorKind { + None, + Io(io::Error), + ParseFloat(std::num::ParseFloatError), + TomlDe(toml::de::Error), + SerdeJson(serde_json::Error), + Curl(curl::Error), +} + +#[derive(Debug)] pub struct TemplogError { desc: String, + kind: TemplogErrorKind, } impl Error for TemplogError { - fn description(&self) -> &str { &self.desc } - fn cause(&self) -> Option<&Error> { None } + fn description(&self) -> &str { + &format!("{}", self) + } + + fn cause(&self) -> Option<&Error> { + match self.kind { + TemplogErrorKind::None => None, + TemplogErrorKind::Io(e) => Some(&e), + TemplogErrorKind::ParseFloat(e) => Some(&e), + TemplogErrorKind::TomlDe(e) => Some(&e), + TemplogErrorKind::SerdeJson(e) => Some(&e), + TemplogErrorKind::Curl(e) => Some(&e), + } + } } impl fmt::Display for TemplogError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "TemplogError: {}", self.desc); + match self.kind { + TemplogErrorKind::None => write!(f, "Templog Error: {}", self.desc), + TemplogErrorKind::Io(e) => write!(f, "Templog IO error {}: {}", self.desc, e), + TemplogErrorKind::TomlDe(e) => write!(f, "Templog toml error {}: {}", self.desc, e), + TemplogErrorKind::SerdeJson(e) => write!(f, "Json decode error {}: {}", self.desc, e), + TemplogErrorKind::ParseFloat(e) => write!(f, "Templog parse error {}: {}", self.desc, e), + TemplogErrorKind::Curl(e) => write!(f, "Templog curl http error {}: {}", self.desc, e), + }; Ok(()) } - - } impl TemplogError { pub fn new(desc: &str) -> Self { TemplogError { desc: desc.to_string(), + kind: TemplogErrorKind::None, + } + } + + pub fn new_io(desc: &str, e: io::Error) -> Self { + TemplogError { + desc: desc.to_string(), + kind: TemplogErrorKind::Io(e), + } + } + + pub fn new_toml_de(desc: &str, e: toml::de::Error) -> Self { + TemplogError { + desc: desc.to_string(), + kind: TemplogErrorKind::TomlDe(e), + } + } + + pub fn new_parse_float(desc: &str, e: std::num::ParseFloatError) -> Self { + TemplogError { + desc: desc.to_string(), + kind: TemplogErrorKind::ParseFloat(e), + } + } + + pub fn new_curl(desc: &str, e: curl::Error) -> Self { + TemplogError { + desc: desc.to_string(), + kind: TemplogErrorKind::Curl(e), + } + } + + pub fn new_serde_json(desc: &str, e: serde_json::Error) -> Self { + TemplogError { + desc: desc.to_string(), + kind: TemplogErrorKind::SerdeJson(e), } } } +impl From for TemplogError { + fn from(e: io::Error) -> Self { + TemplogError::new_io("", e) + } +} + +impl From for TemplogError { + fn from(e: toml::de::Error) -> Self { + TemplogError::new_toml_de("", e) + } +} + +impl From for TemplogError { + fn from(e: std::num::ParseFloatError) -> Self { + TemplogError::new_parse_float("", e) + } +} + +impl From for TemplogError { + fn from(e: curl::Error) -> Self { + TemplogError::new_curl("", e) + } +} + +impl From for TemplogError { + fn from(e: serde_json::Error) -> Self { + TemplogError::new_serde_json("", e) + } +} /// Call closures with a rate limit. Useful for log message ratelimiting +#[derive(Clone)] pub struct NotTooOften { last: Cell, limit: Duration, @@ -200,3 +300,22 @@ } } +/// Takes a stream and returns a stream without errors. +pub fn consume_errors(s: S) -> Box> +// XXX not sure why 'static is really necessary here? + where + S: Stream+Send+'static, + ::Error: std::fmt::Display+Send+'static, + ::Item: Send+'static, + { + s.then(|r| { + match r { + Ok(v) => Ok(Some(v)), + Err(e) => { + debug!("Stream error: {}", e); + Ok(None) + } + } + }) + .filter_map(|p| p).boxed() +}