Mercurial > templog
changeset 627:d5075136442f rust
futures await
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 06 Feb 2018 22:16:44 +0800 |
parents | efcbe0d3afd6 |
children | e1b5938de122 |
files | rust/Cargo.lock rust/Cargo.toml rust/src/main.rs rust/src/params.rs |
diffstat | 4 files changed, 106 insertions(+), 43 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/Cargo.lock Wed Dec 06 00:09:45 2017 +0800 +++ b/rust/Cargo.lock Tue Feb 06 22:16:44 2018 +0800 @@ -118,6 +118,60 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "futures-await" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await-async-macro 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await-await-macro 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-await-async-macro" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await-syn 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await-synom 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-await-await-macro" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-await-quote" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-await-syn" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await-synom 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-await-synom" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "futures-cpupool" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -312,6 +366,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "proc-macro2" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "quote" version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -603,6 +665,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "unicode-xid" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "unreachable" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -649,6 +716,7 @@ "docopt 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-await 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.7 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -692,6 +760,12 @@ "checksum fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f6c0581a4e363262e52b87f59ee2afe3415361c6ec35e665924eb08afe8ff159" "checksum fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "43f3795b4bae048dc6123a6b972cadde2e676f9ded08aef6bb77f5f157684a82" "checksum futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "118b49cac82e04121117cbd3121ede3147e885627d82c4546b87c702debb90c1" +"checksum futures-await 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "84431acb0f168d02bd7727ad9fa385ff877e46d6018efad17ca509ae3bf5457c" +"checksum futures-await-async-macro 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2868c9550459b113f8a656bd8f665bcdcfffb794e3fe5fbeaf5734325d18c2b5" +"checksum futures-await-await-macro 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b7adba18b51fd888a24f6bd41c85e4f544a7089b15f84242350c014f9fdbf895" +"checksum futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f310765f0efc9c12ffb4887ca48d87a71c44ba531d9ba23055681a879f98ab75" +"checksum futures-await-syn 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "90dcf027151897fe50205762efdec791be0e1e2a018d0ae077f72aa0abbf947f" +"checksum futures-await-synom 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c84de0ba04a67d7fc0fb4e3218ba539da65890549922d9d2bc874ba6240030" "checksum futures-cpupool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "e86f49cc0d92fe1b97a5980ec32d56208272cbb00f15044ea9e2799dde766fdf" "checksum httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "af2f2dd97457e8fb1ae7c5a420db346af389926e36f43768b96f101546b04a07" "checksum hyper 0.11.7 (registry+https://github.com/rust-lang/crates.io-index)" = "4959ca95f55df4265bff2ad63066147255e6fa733682cf6d1cb5eaff6e53324b" @@ -714,6 +788,7 @@ "checksum num-traits 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "cacfcab5eb48250ee7d0c7896b51a2c5eec99c1feea5f32025635f5ae4b00070" "checksum num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "514f0d73e64be53ff320680ca671b64fe3fb91da01e1ae2ddc99eb51d453b20d" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" +"checksum proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "557facecbf90ff79faea80a08230d10c812016aa19198ed07d06de61f965b5cc" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" "checksum rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6475140dfd8655aeb72e1fd4b7a1cc1c202be65d71669476e392fe62532b9edd" "checksum redox_syscall 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "ab105df655884ede59d45b7070c8a65002d921461ee813a024558ca16030eea0" @@ -751,6 +826,7 @@ "checksum toml 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bd86ad9ebee246fdedd610e0f6d0587b754a3d81438db930a244d0480ed7878f" "checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a" "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc" +"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f" "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
--- a/rust/Cargo.toml Wed Dec 06 00:09:45 2017 +0800 +++ b/rust/Cargo.toml Tue Feb 06 22:16:44 2018 +0800 @@ -23,6 +23,7 @@ base64 = "0.4" libc = "0.2" atomicwrites = "0.1" +futures-await = "0.1" [target.'cfg(target_os = "linux")'.dependencies] sysfs_gpio = "0.5"
--- a/rust/src/main.rs Wed Dec 06 00:09:45 2017 +0800 +++ b/rust/src/main.rs Tue Feb 06 22:16:44 2018 +0800 @@ -1,5 +1,8 @@ +#![feature(proc_macro, conservative_impl_trait, generators)] + + extern crate tokio_core; -extern crate futures; +extern crate futures_await as futures; #[macro_use] extern crate log; extern crate env_logger; @@ -71,7 +74,7 @@ r }); - let param_stream = params::ParamWaiter::stream(config, &handle); + let param_stream = params::ParamWaiter::stream(config.clone(), handle.clone()); let param_stream = param_stream.map(|p| { fridge::Message::Params(p) });
--- a/rust/src/params.rs Wed Dec 06 00:09:45 2017 +0800 +++ b/rust/src/params.rs Tue Feb 06 22:16:44 2018 +0800 @@ -1,5 +1,5 @@ extern crate tokio_core; -extern crate futures; +extern crate futures_await as futures; extern crate rand; extern crate serde_json; extern crate base64; @@ -14,6 +14,8 @@ use std::cell::{Cell,RefCell}; use std::fs::File; use std::io::Read; +use futures::prelude::*; + use tokio_core::reactor::Interval; use tokio_core::reactor::Handle; @@ -93,7 +95,7 @@ let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); let mut req = hyper::client::Request::new(hyper::Method::Get, uri); { - let mut headers = req.headers_mut(); + let headers = req.headers_mut(); headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); } hyper::client::Client::new(&self.handle).request(req) @@ -137,57 +139,38 @@ }); } - fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { - let resp = rself.make_request(); + #[async_stream(item = Params)] + pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { + let rcself = Rc::new(ParamWaiter::new(config, handle)); - let s = resp.map_err(|e| TemplogError::new_hyper("response", e)) - .and_then(move |r| { - let status = r.status(); - r.body().concat2() - .map_err(|e| TemplogError::new_hyper("body", e)) - .and_then(move |b| { - rself.handle_response(b, status) - }) - }); - Box::new(s) + let dur = Duration::from_millis(4000); + #[async] + for _ in Interval::new(dur, &rcself.handle).unwrap() { + // fetch params + // TODO - skip if inflight. + let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?; + let status = r.status(); + let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?; + if let Ok(params) = rcself.handle_response(b, status) { + stream_yield!(params); + } + } + Ok(()) } - fn new(config: &Config, handle: &Handle) -> Self { + fn new(config: Config, handle: Handle) -> Self { let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); let epoch = base64::encode(&b); + ParamWaiter { limitlog: NotTooOften::new(LOG_MINUTES*60), last_etag: RefCell::new(String::new()), epoch: epoch, - config: config.clone(), - handle: handle.clone(), + config: config, + handle: handle, } } - pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { - let rcself = Rc::new(ParamWaiter::new(config, handle)); - - let dur = Duration::from_millis(4000); - let i = Interval::new(dur, &rcself.handle).unwrap() - .map_err(|e| TemplogError::new_io("interval failed", e)) - .and_then(move |()| { - Self::step(rcself.clone()) - }); - - // TODO use consume_errors() instead once "impl trait" is stable - //Box::new(consume_errors(i)) - let f = i.then(|r| { - match r { - Ok(v) => Ok(Some(v)), - Err(e) => { - debug!("Params stream error: {}", e); - Ok(None) - } - } - }) - .filter_map(|p| p); - Box::new(f) - } }