changeset 627:d5075136442f rust

futures await
author Matt Johnston <matt@ucc.asn.au>
date Tue, 06 Feb 2018 22:16:44 +0800
parents efcbe0d3afd6
children e1b5938de122
files rust/Cargo.lock rust/Cargo.toml rust/src/main.rs rust/src/params.rs
diffstat 4 files changed, 106 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/rust/Cargo.lock	Wed Dec 06 00:09:45 2017 +0800
+++ b/rust/Cargo.lock	Tue Feb 06 22:16:44 2018 +0800
@@ -118,6 +118,60 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "futures-await"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-await-async-macro 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-await-await-macro 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-await-async-macro"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-await-syn 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-await-synom 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-await-await-macro"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "futures-await-quote"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-await-syn"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-await-synom 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-await-synom"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "futures-cpupool"
 version = "0.1.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -312,6 +366,14 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "proc-macro2"
+version = "0.1.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "quote"
 version = "0.3.15"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -603,6 +665,11 @@
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "unicode-xid"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "unreachable"
 version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -649,6 +716,7 @@
  "docopt 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
  "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-await 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "futures-cpupool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
  "hyper 0.11.7 (registry+https://github.com/rust-lang/crates.io-index)",
  "lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -692,6 +760,12 @@
 "checksum fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f6c0581a4e363262e52b87f59ee2afe3415361c6ec35e665924eb08afe8ff159"
 "checksum fuchsia-zircon-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "43f3795b4bae048dc6123a6b972cadde2e676f9ded08aef6bb77f5f157684a82"
 "checksum futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "118b49cac82e04121117cbd3121ede3147e885627d82c4546b87c702debb90c1"
+"checksum futures-await 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "84431acb0f168d02bd7727ad9fa385ff877e46d6018efad17ca509ae3bf5457c"
+"checksum futures-await-async-macro 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2868c9550459b113f8a656bd8f665bcdcfffb794e3fe5fbeaf5734325d18c2b5"
+"checksum futures-await-await-macro 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b7adba18b51fd888a24f6bd41c85e4f544a7089b15f84242350c014f9fdbf895"
+"checksum futures-await-quote 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f310765f0efc9c12ffb4887ca48d87a71c44ba531d9ba23055681a879f98ab75"
+"checksum futures-await-syn 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "90dcf027151897fe50205762efdec791be0e1e2a018d0ae077f72aa0abbf947f"
+"checksum futures-await-synom 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c84de0ba04a67d7fc0fb4e3218ba539da65890549922d9d2bc874ba6240030"
 "checksum futures-cpupool 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "e86f49cc0d92fe1b97a5980ec32d56208272cbb00f15044ea9e2799dde766fdf"
 "checksum httparse 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "af2f2dd97457e8fb1ae7c5a420db346af389926e36f43768b96f101546b04a07"
 "checksum hyper 0.11.7 (registry+https://github.com/rust-lang/crates.io-index)" = "4959ca95f55df4265bff2ad63066147255e6fa733682cf6d1cb5eaff6e53324b"
@@ -714,6 +788,7 @@
 "checksum num-traits 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "cacfcab5eb48250ee7d0c7896b51a2c5eec99c1feea5f32025635f5ae4b00070"
 "checksum num_cpus 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "514f0d73e64be53ff320680ca671b64fe3fb91da01e1ae2ddc99eb51d453b20d"
 "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
+"checksum proc-macro2 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "557facecbf90ff79faea80a08230d10c812016aa19198ed07d06de61f965b5cc"
 "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"
 "checksum rand 0.3.18 (registry+https://github.com/rust-lang/crates.io-index)" = "6475140dfd8655aeb72e1fd4b7a1cc1c202be65d71669476e392fe62532b9edd"
 "checksum redox_syscall 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "ab105df655884ede59d45b7070c8a65002d921461ee813a024558ca16030eea0"
@@ -751,6 +826,7 @@
 "checksum toml 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bd86ad9ebee246fdedd610e0f6d0587b754a3d81438db930a244d0480ed7878f"
 "checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a"
 "checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
+"checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc"
 "checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56"
 "checksum utf8-ranges 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a1ca13c08c41c9c3e04224ed9ff80461d97e121589ff27c753a16cb10830ae0f"
 "checksum utf8-ranges 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "662fab6525a98beff2921d7f61a39e7d59e0b425ebc7d0d9e66d316e55124122"
--- a/rust/Cargo.toml	Wed Dec 06 00:09:45 2017 +0800
+++ b/rust/Cargo.toml	Tue Feb 06 22:16:44 2018 +0800
@@ -23,6 +23,7 @@
 base64 = "0.4"
 libc = "0.2"
 atomicwrites = "0.1"
+futures-await = "0.1"
 
 [target.'cfg(target_os = "linux")'.dependencies]
 sysfs_gpio = "0.5"
--- a/rust/src/main.rs	Wed Dec 06 00:09:45 2017 +0800
+++ b/rust/src/main.rs	Tue Feb 06 22:16:44 2018 +0800
@@ -1,5 +1,8 @@
+#![feature(proc_macro, conservative_impl_trait, generators)]
+
+
 extern crate tokio_core;
-extern crate futures;
+extern crate futures_await as futures;
 #[macro_use]
 extern crate log;
 extern crate env_logger;
@@ -71,7 +74,7 @@
         r
     });
 
-    let param_stream = params::ParamWaiter::stream(config, &handle);
+    let param_stream = params::ParamWaiter::stream(config.clone(), handle.clone());
     let param_stream = param_stream.map(|p| {
             fridge::Message::Params(p)
         });
--- a/rust/src/params.rs	Wed Dec 06 00:09:45 2017 +0800
+++ b/rust/src/params.rs	Tue Feb 06 22:16:44 2018 +0800
@@ -1,5 +1,5 @@
 extern crate tokio_core;
-extern crate futures;
+extern crate futures_await as futures;
 extern crate rand;
 extern crate serde_json;
 extern crate base64;
@@ -14,6 +14,8 @@
 use std::cell::{Cell,RefCell};
 use std::fs::File;
 use std::io::Read;
+use futures::prelude::*;
+
 
 use tokio_core::reactor::Interval;
 use tokio_core::reactor::Handle;
@@ -93,7 +95,7 @@
         let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
         let mut req = hyper::client::Request::new(hyper::Method::Get, uri);
         {
-            let mut headers = req.headers_mut();
+            let headers = req.headers_mut();
             headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone())));
         }
         hyper::client::Client::new(&self.handle).request(req)
@@ -137,57 +139,38 @@
         });
     }
 
-    fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> {
-        let resp = rself.make_request();
+    #[async_stream(item = Params)]
+    pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
+        let rcself = Rc::new(ParamWaiter::new(config, handle));
 
-        let s = resp.map_err(|e| TemplogError::new_hyper("response", e))
-                    .and_then(move |r| {
-                        let status = r.status();
-                        r.body().concat2()
-                            .map_err(|e| TemplogError::new_hyper("body", e))
-                            .and_then(move |b| {
-                                rself.handle_response(b, status)
-                            })
-                    });
-        Box::new(s)
+        let dur = Duration::from_millis(4000);
+        #[async]
+        for _ in Interval::new(dur, &rcself.handle).unwrap() {
+            // fetch params
+            // TODO - skip if inflight.
+            let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
+            let status = r.status();
+            let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
+            if let Ok(params) = rcself.handle_response(b, status) {
+                stream_yield!(params);
+            }
+        }
+        Ok(())
     }
 
-    fn new(config: &Config, handle: &Handle) -> Self {
+    fn new(config: Config, handle: Handle) -> Self {
         let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
         rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b);
         let epoch = base64::encode(&b);
+
         ParamWaiter {
             limitlog: NotTooOften::new(LOG_MINUTES*60),
             last_etag: RefCell::new(String::new()),
             epoch: epoch,
-            config: config.clone(),
-            handle: handle.clone(),
+            config: config,
+            handle: handle,
         }
     }
 
-    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
-        let rcself = Rc::new(ParamWaiter::new(config, handle));
-
-        let dur = Duration::from_millis(4000);
-        let i = Interval::new(dur, &rcself.handle).unwrap()
-            .map_err(|e| TemplogError::new_io("interval failed", e))
-            .and_then(move |()| {
-                Self::step(rcself.clone())
-            });
-
-        // TODO use consume_errors() instead once "impl trait" is stable
-        //Box::new(consume_errors(i))
-        let f = i.then(|r| {
-            match r {
-                Ok(v) => Ok(Some(v)),
-                Err(e) => {
-                    debug!("Params stream error: {}", e);
-                    Ok(None)
-                }
-            }
-        })
-        .filter_map(|p| p);
-        Box::new(f)
-    }
 }