comparison rust/src/params.rs @ 627:d5075136442f rust

futures await
author Matt Johnston <matt@ucc.asn.au>
date Tue, 06 Feb 2018 22:16:44 +0800
parents efcbe0d3afd6
children 3e5e52d50af5
comparison
equal deleted inserted replaced
626:efcbe0d3afd6 627:d5075136442f
1 extern crate tokio_core; 1 extern crate tokio_core;
2 extern crate futures; 2 extern crate futures_await as futures;
3 extern crate rand; 3 extern crate rand;
4 extern crate serde_json; 4 extern crate serde_json;
5 extern crate base64; 5 extern crate base64;
6 extern crate atomicwrites; 6 extern crate atomicwrites;
7 7
12 use std::sync::{Arc,Mutex}; 12 use std::sync::{Arc,Mutex};
13 use std::error::Error; 13 use std::error::Error;
14 use std::cell::{Cell,RefCell}; 14 use std::cell::{Cell,RefCell};
15 use std::fs::File; 15 use std::fs::File;
16 use std::io::Read; 16 use std::io::Read;
17 use futures::prelude::*;
18
17 19
18 use tokio_core::reactor::Interval; 20 use tokio_core::reactor::Interval;
19 use tokio_core::reactor::Handle; 21 use tokio_core::reactor::Handle;
20 use futures::{Stream,Future,future}; 22 use futures::{Stream,Future,future};
21 use self::rand::Rng; 23 use self::rand::Rng;
91 impl ParamWaiter { 93 impl ParamWaiter {
92 fn make_request(&self) -> hyper::client::FutureResponse { 94 fn make_request(&self) -> hyper::client::FutureResponse {
93 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); 95 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
94 let mut req = hyper::client::Request::new(hyper::Method::Get, uri); 96 let mut req = hyper::client::Request::new(hyper::Method::Get, uri);
95 { 97 {
96 let mut headers = req.headers_mut(); 98 let headers = req.headers_mut();
97 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); 99 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone())));
98 } 100 }
99 hyper::client::Client::new(&self.handle).request(req) 101 hyper::client::Client::new(&self.handle).request(req)
100 // XXX how do we do this? 102 // XXX how do we do this?
101 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); 103 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
135 p.write(|f| { 137 p.write(|f| {
136 serde_json::to_writer(f, params) 138 serde_json::to_writer(f, params)
137 }); 139 });
138 } 140 }
139 141
140 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { 142 #[async_stream(item = Params)]
141 let resp = rself.make_request(); 143 pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
144 let rcself = Rc::new(ParamWaiter::new(config, handle));
142 145
143 let s = resp.map_err(|e| TemplogError::new_hyper("response", e)) 146 let dur = Duration::from_millis(4000);
144 .and_then(move |r| { 147 #[async]
145 let status = r.status(); 148 for _ in Interval::new(dur, &rcself.handle).unwrap() {
146 r.body().concat2() 149 // fetch params
147 .map_err(|e| TemplogError::new_hyper("body", e)) 150 // TODO - skip if inflight.
148 .and_then(move |b| { 151 let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
149 rself.handle_response(b, status) 152 let status = r.status();
150 }) 153 let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
151 }); 154 if let Ok(params) = rcself.handle_response(b, status) {
152 Box::new(s) 155 stream_yield!(params);
156 }
157 }
158 Ok(())
153 } 159 }
154 160
155 fn new(config: &Config, handle: &Handle) -> Self { 161 fn new(config: Config, handle: Handle) -> Self {
156 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 162 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
157 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); 163 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b);
158 let epoch = base64::encode(&b); 164 let epoch = base64::encode(&b);
165
159 ParamWaiter { 166 ParamWaiter {
160 limitlog: NotTooOften::new(LOG_MINUTES*60), 167 limitlog: NotTooOften::new(LOG_MINUTES*60),
161 last_etag: RefCell::new(String::new()), 168 last_etag: RefCell::new(String::new()),
162 epoch: epoch, 169 epoch: epoch,
163 config: config.clone(), 170 config: config,
164 handle: handle.clone(), 171 handle: handle,
165 } 172 }
166 } 173 }
167 174
168 pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
169 let rcself = Rc::new(ParamWaiter::new(config, handle));
170
171 let dur = Duration::from_millis(4000);
172 let i = Interval::new(dur, &rcself.handle).unwrap()
173 .map_err(|e| TemplogError::new_io("interval failed", e))
174 .and_then(move |()| {
175 Self::step(rcself.clone())
176 });
177
178 // TODO use consume_errors() instead once "impl trait" is stable
179 //Box::new(consume_errors(i))
180 let f = i.then(|r| {
181 match r {
182 Ok(v) => Ok(Some(v)),
183 Err(e) => {
184 debug!("Params stream error: {}", e);
185 Ok(None)
186 }
187 }
188 })
189 .filter_map(|p| p);
190 Box::new(f)
191 }
192 } 175 }
193 176