Mercurial > templog
comparison rust/src/params.rs @ 615:f153aec221be rust
move Params, epoch code
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 07 Mar 2017 23:56:12 +0800 |
parents | rust/src/paramwaiter.rs@e1bab5b36352 |
children | a85c0c9bc1fa |
comparison
equal
deleted
inserted
replaced
614:e1bab5b36352 | 615:f153aec221be |
---|---|
1 extern crate tokio_core; | |
2 extern crate futures; | |
3 extern crate rand; | |
4 extern crate serde_json; | |
5 | |
6 use std::time::Duration; | |
7 use std::io; | |
8 use std::str; | |
9 use std::rc::Rc; | |
10 use std::sync::{Arc,Mutex}; | |
11 use std::error::Error; | |
12 use std::cell::{Cell,RefCell}; | |
13 | |
14 use tokio_core::reactor::Interval; | |
15 use tokio_core::reactor::Handle; | |
16 use tokio_curl::Session; | |
17 use futures::{Stream,Future,future}; | |
18 use curl::easy::Easy; | |
19 use curl::easy; | |
20 | |
21 use types::*; | |
22 use ::Config; | |
23 | |
24 #[derive(Deserialize, Serialize, Debug, Clone)] | |
25 pub struct Params { | |
26 pub fridge_setpoint: f32, | |
27 pub fridge_difference: f32, | |
28 pub overshoot_delay: u32, | |
29 pub overshoot_factor: f32, | |
30 pub disabled: bool, | |
31 pub nowort: bool, | |
32 pub fridge_range_lower: f32, | |
33 pub fridge_range_upper: f32, | |
34 | |
35 #[serde(skip_serializing)] | |
36 pub epoch: String, | |
37 } | |
38 | |
39 impl Params { | |
40 pub fn defaults() -> Params { | |
41 Params { | |
42 fridge_setpoint: 16.0, | |
43 fridge_difference: 0.2, | |
44 overshoot_delay: 720, // 12 minutes | |
45 overshoot_factor: 1.0, | |
46 disabled: false, | |
47 nowort: false, | |
48 fridge_range_lower: 3.0, | |
49 fridge_range_upper: 3.0, | |
50 epoch: String::new(), | |
51 } | |
52 } | |
53 | |
54 pub fn load(config: &Config) -> Params { | |
55 // generate random epoch on success. | |
56 // TODO: return failure? or just return default() ? | |
57 unimplemented!(); | |
58 } | |
59 } | |
60 | |
61 #[derive(Deserialize, Debug)] | |
62 struct Response { | |
63 epoch_tag: String, | |
64 params: Params, | |
65 } | |
66 | |
67 #[derive(Clone)] | |
68 pub struct ParamWaiter { | |
69 limitlog: NotTooOften, | |
70 epoch_tag: RefCell<String>, | |
71 session: RefCell<Option<Session>>, | |
72 | |
73 config: Config, | |
74 handle: Handle, | |
75 } | |
76 | |
77 const LOG_MINUTES: u64 = 15; | |
78 const MAX_RESPONSE_SIZE: usize = 10000; | |
79 const TIMEOUT_MINUTES: u64 = 5; | |
80 | |
81 impl ParamWaiter { | |
82 fn make_request(&self) -> Easy { | |
83 let mut req = Easy::new(); | |
84 req.get(true).unwrap(); | |
85 // supposedly req.url won't fail, checking is later? | |
86 req.url(&self.config.SETTINGS_URL).unwrap(); | |
87 | |
88 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); | |
89 | |
90 // http header | |
91 // etag: epoch-tag | |
92 let e = self.epoch_tag.borrow(); | |
93 if !e.is_empty() { | |
94 let mut list = easy::List::new(); | |
95 let hd = format!("etag: {}", *e); | |
96 list.append(&hd).unwrap(); | |
97 req.http_headers(list).unwrap(); | |
98 } | |
99 | |
100 req | |
101 } | |
102 | |
103 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { | |
104 let text = String::from_utf8_lossy(buf).to_string(); | |
105 let resp = req.response_code()?; | |
106 match resp { | |
107 200 => { | |
108 // new params | |
109 let r: Response = serde_json::from_str(&text)?; | |
110 *self.epoch_tag.borrow_mut() = r.epoch_tag; | |
111 Ok(Some(r.params)) | |
112 } | |
113 304 => Ok(None), // unmodified (long polling timeout at the server) | |
114 _ => { | |
115 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) | |
116 }, | |
117 } | |
118 } | |
119 | |
120 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { | |
121 | |
122 if rself.session.borrow().is_none() { | |
123 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); | |
124 } | |
125 let ses = rself.session.borrow().clone().unwrap(); | |
126 | |
127 let mut req = rself.make_request(); | |
128 let buf = Arc::new(Mutex::new(Vec::new())); | |
129 | |
130 let dst = buf.clone(); | |
131 req.write_function(move |data| { | |
132 let mut dst = dst.lock().unwrap(); | |
133 dst.extend_from_slice(data); | |
134 if dst.len() > MAX_RESPONSE_SIZE { | |
135 debug!("Too large params response from server: {}", dst.len()); | |
136 Ok(0) | |
137 } else { | |
138 Ok(data.len()) | |
139 } | |
140 }).unwrap(); | |
141 | |
142 let s = ses.perform(req) | |
143 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) | |
144 .and_then(move |mut rq| { | |
145 let result = buf.lock().unwrap(); | |
146 rself.handle_response(&mut rq, &result) | |
147 }); | |
148 Box::new(s) | |
149 | |
150 /* | |
151 let mut p = Params::defaults(); | |
152 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); | |
153 future::ok(Some(p)).boxed() | |
154 */ | |
155 } | |
156 | |
157 fn new(config: &Config, epoch: String, handle: &Handle) -> Self { | |
158 ParamWaiter { | |
159 limitlog: NotTooOften::new(LOG_MINUTES*60), | |
160 epoch_tag: RefCell::new(epoch), | |
161 session: RefCell::new(None), | |
162 config: config.clone(), | |
163 handle: handle.clone(), | |
164 } | |
165 } | |
166 | |
167 pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { | |
168 let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); | |
169 | |
170 let dur = Duration::from_millis(4000); | |
171 let i = Interval::new(dur, &rcself.handle).unwrap() | |
172 .map_err(|e| TemplogError::new_io("interval failed", e)) | |
173 .and_then(move |()| { | |
174 Self::step(rcself.clone()) | |
175 }) | |
176 // throw away None params | |
177 .filter_map(|p| p); | |
178 Box::new(i) | |
179 } | |
180 } | |
181 |