Mercurial > templog
comparison rust/src/params.rs @ 616:a85c0c9bc1fa rust
hide epoch in ParamWaiter
make_request handles the buffer too
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 08 Mar 2017 23:08:14 +0800 |
parents | f153aec221be |
children | 2d65a9f0bed3 |
comparison
equal
deleted
inserted
replaced
615:f153aec221be | 616:a85c0c9bc1fa |
---|---|
1 extern crate tokio_core; | 1 extern crate tokio_core; |
2 extern crate futures; | 2 extern crate futures; |
3 extern crate rand; | 3 extern crate rand; |
4 extern crate serde_json; | 4 extern crate serde_json; |
5 extern crate base64; | |
5 | 6 |
6 use std::time::Duration; | 7 use std::time::Duration; |
7 use std::io; | 8 use std::io; |
8 use std::str; | 9 use std::str; |
9 use std::rc::Rc; | 10 use std::rc::Rc; |
15 use tokio_core::reactor::Handle; | 16 use tokio_core::reactor::Handle; |
16 use tokio_curl::Session; | 17 use tokio_curl::Session; |
17 use futures::{Stream,Future,future}; | 18 use futures::{Stream,Future,future}; |
18 use curl::easy::Easy; | 19 use curl::easy::Easy; |
19 use curl::easy; | 20 use curl::easy; |
21 use self::rand::Rng; | |
20 | 22 |
21 use types::*; | 23 use types::*; |
22 use ::Config; | 24 use ::Config; |
23 | 25 |
24 #[derive(Deserialize, Serialize, Debug, Clone)] | 26 #[derive(Deserialize, Serialize, Debug, Clone)] |
29 pub overshoot_factor: f32, | 31 pub overshoot_factor: f32, |
30 pub disabled: bool, | 32 pub disabled: bool, |
31 pub nowort: bool, | 33 pub nowort: bool, |
32 pub fridge_range_lower: f32, | 34 pub fridge_range_lower: f32, |
33 pub fridge_range_upper: f32, | 35 pub fridge_range_upper: f32, |
34 | |
35 #[serde(skip_serializing)] | |
36 pub epoch: String, | |
37 } | 36 } |
38 | 37 |
39 impl Params { | 38 impl Params { |
40 pub fn defaults() -> Params { | 39 pub fn defaults() -> Params { |
41 Params { | 40 Params { |
45 overshoot_factor: 1.0, | 44 overshoot_factor: 1.0, |
46 disabled: false, | 45 disabled: false, |
47 nowort: false, | 46 nowort: false, |
48 fridge_range_lower: 3.0, | 47 fridge_range_lower: 3.0, |
49 fridge_range_upper: 3.0, | 48 fridge_range_upper: 3.0, |
50 epoch: String::new(), | |
51 } | 49 } |
52 } | 50 } |
53 | 51 |
54 pub fn load(config: &Config) -> Params { | 52 pub fn load(config: &Config) -> Params { |
55 // generate random epoch on success. | 53 // generate random epoch on success. |
56 // TODO: return failure? or just return default() ? | 54 // TODO: return failure? or just return default() ? |
55 let mut p = Params::defaults(); | |
56 | |
57 unimplemented!(); | 57 unimplemented!(); |
58 } | 58 } |
59 } | 59 } |
60 | 60 |
61 #[derive(Deserialize, Debug)] | 61 #[derive(Deserialize, Debug)] |
62 struct Response { | 62 struct Response { |
63 epoch_tag: String, | 63 // sent as an opaque etag: header. Has format "epoch-nonce", |
64 // responses where the epoch do not match ParamWaiter::epoch are dropped | |
65 etag: String, | |
64 params: Params, | 66 params: Params, |
65 } | 67 } |
66 | 68 |
67 #[derive(Clone)] | 69 #[derive(Clone)] |
68 pub struct ParamWaiter { | 70 pub struct ParamWaiter { |
69 limitlog: NotTooOften, | 71 limitlog: NotTooOften, |
70 epoch_tag: RefCell<String>, | 72 // last_etag is used for long-polling. |
73 last_etag: RefCell<String>, | |
71 session: RefCell<Option<Session>>, | 74 session: RefCell<Option<Session>>, |
75 epoch: String, | |
72 | 76 |
73 config: Config, | 77 config: Config, |
74 handle: Handle, | 78 handle: Handle, |
75 } | 79 } |
76 | 80 |
77 const LOG_MINUTES: u64 = 15; | 81 const LOG_MINUTES: u64 = 15; |
78 const MAX_RESPONSE_SIZE: usize = 10000; | 82 const MAX_RESPONSE_SIZE: usize = 10000; |
79 const TIMEOUT_MINUTES: u64 = 5; | 83 const TIMEOUT_MINUTES: u64 = 5; |
80 | 84 |
81 impl ParamWaiter { | 85 impl ParamWaiter { |
82 fn make_request(&self) -> Easy { | 86 fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) { |
83 let mut req = Easy::new(); | 87 let mut req = Easy::new(); |
84 req.get(true).unwrap(); | 88 req.get(true).unwrap(); |
85 // supposedly req.url won't fail, checking is later? | 89 // supposedly req.url won't fail, checking happens later? |
86 req.url(&self.config.SETTINGS_URL).unwrap(); | 90 req.url(&self.config.SETTINGS_URL).unwrap(); |
87 | 91 |
88 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); | 92 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); |
89 | 93 |
94 // store response | |
95 let max_response_size = 10000; | |
96 let buf = Arc::new(Mutex::new(Vec::new())); | |
97 let dst = buf.clone(); | |
98 req.write_function(move |data| { | |
99 let mut dst = dst.lock().unwrap(); | |
100 dst.extend_from_slice(data); | |
101 if dst.len() > max_response_size { | |
102 error!("Too large params response from server: {}", dst.len()); | |
103 Ok(0) | |
104 } else { | |
105 Ok(data.len()) | |
106 } | |
107 }).unwrap(); | |
108 | |
90 // http header | 109 // http header |
91 // etag: epoch-tag | 110 let e = self.last_etag.borrow(); |
92 let e = self.epoch_tag.borrow(); | |
93 if !e.is_empty() { | 111 if !e.is_empty() { |
94 let mut list = easy::List::new(); | 112 let mut list = easy::List::new(); |
95 let hd = format!("etag: {}", *e); | 113 let hd = format!("etag: {}", *e); |
96 list.append(&hd).unwrap(); | 114 list.append(&hd).unwrap(); |
97 req.http_headers(list).unwrap(); | 115 req.http_headers(list).unwrap(); |
98 } | 116 } |
99 | 117 |
100 req | 118 (req, buf) |
101 } | 119 } |
102 | 120 |
103 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { | 121 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { |
104 let text = String::from_utf8_lossy(buf).to_string(); | 122 let text = String::from_utf8_lossy(buf).to_string(); |
105 let resp = req.response_code()?; | 123 let resp = req.response_code()?; |
106 match resp { | 124 match resp { |
107 200 => { | 125 200 => { |
108 // new params | 126 // new params |
109 let r: Response = serde_json::from_str(&text)?; | 127 let r: Response = serde_json::from_str(&text)?; |
110 *self.epoch_tag.borrow_mut() = r.epoch_tag; | 128 *self.last_etag.borrow_mut() = r.etag; |
111 Ok(Some(r.params)) | 129 Ok(Some(r.params)) |
112 } | 130 } |
113 304 => Ok(None), // unmodified (long polling timeout at the server) | 131 304 => Ok(None), // unmodified (long polling timeout at the server) |
114 _ => { | 132 _ => { |
115 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) | 133 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) |
120 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { | 138 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { |
121 | 139 |
122 if rself.session.borrow().is_none() { | 140 if rself.session.borrow().is_none() { |
123 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); | 141 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); |
124 } | 142 } |
143 | |
144 let (mut req, buf) = rself.make_request(); | |
145 | |
125 let ses = rself.session.borrow().clone().unwrap(); | 146 let ses = rself.session.borrow().clone().unwrap(); |
126 | |
127 let mut req = rself.make_request(); | |
128 let buf = Arc::new(Mutex::new(Vec::new())); | |
129 | |
130 let dst = buf.clone(); | |
131 req.write_function(move |data| { | |
132 let mut dst = dst.lock().unwrap(); | |
133 dst.extend_from_slice(data); | |
134 if dst.len() > MAX_RESPONSE_SIZE { | |
135 debug!("Too large params response from server: {}", dst.len()); | |
136 Ok(0) | |
137 } else { | |
138 Ok(data.len()) | |
139 } | |
140 }).unwrap(); | |
141 | |
142 let s = ses.perform(req) | 147 let s = ses.perform(req) |
143 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) | 148 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) |
144 .and_then(move |mut rq| { | 149 .and_then(move |mut rq| { |
145 let result = buf.lock().unwrap(); | 150 let result = buf.lock().unwrap(); |
146 rself.handle_response(&mut rq, &result) | 151 rself.handle_response(&mut rq, &result) |
152 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); | 157 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); |
153 future::ok(Some(p)).boxed() | 158 future::ok(Some(p)).boxed() |
154 */ | 159 */ |
155 } | 160 } |
156 | 161 |
157 fn new(config: &Config, epoch: String, handle: &Handle) -> Self { | 162 fn new(config: &Config, handle: &Handle) -> Self { |
163 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 | |
164 rand::OsRng::new().unwrap().fill_bytes(&mut b); | |
165 let epoch = base64::encode(&b); | |
158 ParamWaiter { | 166 ParamWaiter { |
159 limitlog: NotTooOften::new(LOG_MINUTES*60), | 167 limitlog: NotTooOften::new(LOG_MINUTES*60), |
160 epoch_tag: RefCell::new(epoch), | 168 last_etag: RefCell::new(String::new()), |
161 session: RefCell::new(None), | 169 session: RefCell::new(None), |
170 epoch: epoch, | |
162 config: config.clone(), | 171 config: config.clone(), |
163 handle: handle.clone(), | 172 handle: handle.clone(), |
164 } | 173 } |
165 } | 174 } |
166 | 175 |
167 pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { | 176 pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { |
168 let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); | 177 let rcself = Rc::new(ParamWaiter::new(config, handle)); |
169 | 178 |
170 let dur = Duration::from_millis(4000); | 179 let dur = Duration::from_millis(4000); |
171 let i = Interval::new(dur, &rcself.handle).unwrap() | 180 let i = Interval::new(dur, &rcself.handle).unwrap() |
172 .map_err(|e| TemplogError::new_io("interval failed", e)) | 181 .map_err(|e| TemplogError::new_io("interval failed", e)) |
173 .and_then(move |()| { | 182 .and_then(move |()| { |