diff rust/src/params.rs @ 627:d5075136442f rust

futures await
author Matt Johnston <matt@ucc.asn.au>
date Tue, 06 Feb 2018 22:16:44 +0800
parents efcbe0d3afd6
children 3e5e52d50af5
line wrap: on
line diff
--- a/rust/src/params.rs	Wed Dec 06 00:09:45 2017 +0800
+++ b/rust/src/params.rs	Tue Feb 06 22:16:44 2018 +0800
@@ -1,5 +1,5 @@
 extern crate tokio_core;
-extern crate futures;
+extern crate futures_await as futures;
 extern crate rand;
 extern crate serde_json;
 extern crate base64;
@@ -14,6 +14,8 @@
 use std::cell::{Cell,RefCell};
 use std::fs::File;
 use std::io::Read;
+use futures::prelude::*;
+
 
 use tokio_core::reactor::Interval;
 use tokio_core::reactor::Handle;
@@ -93,7 +95,7 @@
         let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
         let mut req = hyper::client::Request::new(hyper::Method::Get, uri);
         {
-            let mut headers = req.headers_mut();
+            let headers = req.headers_mut();
             headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone())));
         }
         hyper::client::Client::new(&self.handle).request(req)
@@ -137,57 +139,38 @@
         });
     }
 
-    fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> {
-        let resp = rself.make_request();
+    #[async_stream(item = Params)]
+    pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
+        let rcself = Rc::new(ParamWaiter::new(config, handle));
 
-        let s = resp.map_err(|e| TemplogError::new_hyper("response", e))
-                    .and_then(move |r| {
-                        let status = r.status();
-                        r.body().concat2()
-                            .map_err(|e| TemplogError::new_hyper("body", e))
-                            .and_then(move |b| {
-                                rself.handle_response(b, status)
-                            })
-                    });
-        Box::new(s)
+        let dur = Duration::from_millis(4000);
+        #[async]
+        for _ in Interval::new(dur, &rcself.handle).unwrap() {
+            // fetch params
+            // TODO - skip if inflight.
+            let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
+            let status = r.status();
+            let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
+            if let Ok(params) = rcself.handle_response(b, status) {
+                stream_yield!(params);
+            }
+        }
+        Ok(())
     }
 
-    fn new(config: &Config, handle: &Handle) -> Self {
+    fn new(config: Config, handle: Handle) -> Self {
         let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
         rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b);
         let epoch = base64::encode(&b);
+
         ParamWaiter {
             limitlog: NotTooOften::new(LOG_MINUTES*60),
             last_etag: RefCell::new(String::new()),
             epoch: epoch,
-            config: config.clone(),
-            handle: handle.clone(),
+            config: config,
+            handle: handle,
         }
     }
 
-    pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
-        let rcself = Rc::new(ParamWaiter::new(config, handle));
-
-        let dur = Duration::from_millis(4000);
-        let i = Interval::new(dur, &rcself.handle).unwrap()
-            .map_err(|e| TemplogError::new_io("interval failed", e))
-            .and_then(move |()| {
-                Self::step(rcself.clone())
-            });
-
-        // TODO use consume_errors() instead once "impl trait" is stable
-        //Box::new(consume_errors(i))
-        let f = i.then(|r| {
-            match r {
-                Ok(v) => Ok(Some(v)),
-                Err(e) => {
-                    debug!("Params stream error: {}", e);
-                    Ok(None)
-                }
-            }
-        })
-        .filter_map(|p| p);
-        Box::new(f)
-    }
 }