Mercurial > templog
comparison rust/src/params.rs @ 627:d5075136442f rust
futures await
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 06 Feb 2018 22:16:44 +0800 |
parents | efcbe0d3afd6 |
children | 3e5e52d50af5 |
comparison
equal
deleted
inserted
replaced
626:efcbe0d3afd6 | 627:d5075136442f |
---|---|
1 extern crate tokio_core; | 1 extern crate tokio_core; |
2 extern crate futures; | 2 extern crate futures_await as futures; |
3 extern crate rand; | 3 extern crate rand; |
4 extern crate serde_json; | 4 extern crate serde_json; |
5 extern crate base64; | 5 extern crate base64; |
6 extern crate atomicwrites; | 6 extern crate atomicwrites; |
7 | 7 |
12 use std::sync::{Arc,Mutex}; | 12 use std::sync::{Arc,Mutex}; |
13 use std::error::Error; | 13 use std::error::Error; |
14 use std::cell::{Cell,RefCell}; | 14 use std::cell::{Cell,RefCell}; |
15 use std::fs::File; | 15 use std::fs::File; |
16 use std::io::Read; | 16 use std::io::Read; |
17 use futures::prelude::*; | |
18 | |
17 | 19 |
18 use tokio_core::reactor::Interval; | 20 use tokio_core::reactor::Interval; |
19 use tokio_core::reactor::Handle; | 21 use tokio_core::reactor::Handle; |
20 use futures::{Stream,Future,future}; | 22 use futures::{Stream,Future,future}; |
21 use self::rand::Rng; | 23 use self::rand::Rng; |
91 impl ParamWaiter { | 93 impl ParamWaiter { |
92 fn make_request(&self) -> hyper::client::FutureResponse { | 94 fn make_request(&self) -> hyper::client::FutureResponse { |
93 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); | 95 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); |
94 let mut req = hyper::client::Request::new(hyper::Method::Get, uri); | 96 let mut req = hyper::client::Request::new(hyper::Method::Get, uri); |
95 { | 97 { |
96 let mut headers = req.headers_mut(); | 98 let headers = req.headers_mut(); |
97 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); | 99 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); |
98 } | 100 } |
99 hyper::client::Client::new(&self.handle).request(req) | 101 hyper::client::Client::new(&self.handle).request(req) |
100 // XXX how do we do this? | 102 // XXX how do we do this? |
101 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); | 103 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); |
135 p.write(|f| { | 137 p.write(|f| { |
136 serde_json::to_writer(f, params) | 138 serde_json::to_writer(f, params) |
137 }); | 139 }); |
138 } | 140 } |
139 | 141 |
140 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { | 142 #[async_stream(item = Params)] |
141 let resp = rself.make_request(); | 143 pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { |
144 let rcself = Rc::new(ParamWaiter::new(config, handle)); | |
142 | 145 |
143 let s = resp.map_err(|e| TemplogError::new_hyper("response", e)) | 146 let dur = Duration::from_millis(4000); |
144 .and_then(move |r| { | 147 #[async] |
145 let status = r.status(); | 148 for _ in Interval::new(dur, &rcself.handle).unwrap() { |
146 r.body().concat2() | 149 // fetch params |
147 .map_err(|e| TemplogError::new_hyper("body", e)) | 150 // TODO - skip if inflight. |
148 .and_then(move |b| { | 151 let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?; |
149 rself.handle_response(b, status) | 152 let status = r.status(); |
150 }) | 153 let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?; |
151 }); | 154 if let Ok(params) = rcself.handle_response(b, status) { |
152 Box::new(s) | 155 stream_yield!(params); |
156 } | |
157 } | |
158 Ok(()) | |
153 } | 159 } |
154 | 160 |
155 fn new(config: &Config, handle: &Handle) -> Self { | 161 fn new(config: Config, handle: Handle) -> Self { |
156 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 | 162 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 |
157 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); | 163 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); |
158 let epoch = base64::encode(&b); | 164 let epoch = base64::encode(&b); |
165 | |
159 ParamWaiter { | 166 ParamWaiter { |
160 limitlog: NotTooOften::new(LOG_MINUTES*60), | 167 limitlog: NotTooOften::new(LOG_MINUTES*60), |
161 last_etag: RefCell::new(String::new()), | 168 last_etag: RefCell::new(String::new()), |
162 epoch: epoch, | 169 epoch: epoch, |
163 config: config.clone(), | 170 config: config, |
164 handle: handle.clone(), | 171 handle: handle, |
165 } | 172 } |
166 } | 173 } |
167 | 174 |
168 pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { | |
169 let rcself = Rc::new(ParamWaiter::new(config, handle)); | |
170 | |
171 let dur = Duration::from_millis(4000); | |
172 let i = Interval::new(dur, &rcself.handle).unwrap() | |
173 .map_err(|e| TemplogError::new_io("interval failed", e)) | |
174 .and_then(move |()| { | |
175 Self::step(rcself.clone()) | |
176 }); | |
177 | |
178 // TODO use consume_errors() instead once "impl trait" is stable | |
179 //Box::new(consume_errors(i)) | |
180 let f = i.then(|r| { | |
181 match r { | |
182 Ok(v) => Ok(Some(v)), | |
183 Err(e) => { | |
184 debug!("Params stream error: {}", e); | |
185 Ok(None) | |
186 } | |
187 } | |
188 }) | |
189 .filter_map(|p| p); | |
190 Box::new(f) | |
191 } | |
192 } | 175 } |
193 | 176 |