comparison rust/src/params.rs @ 615:f153aec221be rust

move Params, epoch code
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:56:12 +0800
parents rust/src/paramwaiter.rs@e1bab5b36352
children a85c0c9bc1fa
comparison
equal deleted inserted replaced
614:e1bab5b36352 615:f153aec221be
1 extern crate tokio_core;
2 extern crate futures;
3 extern crate rand;
4 extern crate serde_json;
5
6 use std::time::Duration;
7 use std::io;
8 use std::str;
9 use std::rc::Rc;
10 use std::sync::{Arc,Mutex};
11 use std::error::Error;
12 use std::cell::{Cell,RefCell};
13
14 use tokio_core::reactor::Interval;
15 use tokio_core::reactor::Handle;
16 use tokio_curl::Session;
17 use futures::{Stream,Future,future};
18 use curl::easy::Easy;
19 use curl::easy;
20
21 use types::*;
22 use ::Config;
23
24 #[derive(Deserialize, Serialize, Debug, Clone)]
25 pub struct Params {
26 pub fridge_setpoint: f32,
27 pub fridge_difference: f32,
28 pub overshoot_delay: u32,
29 pub overshoot_factor: f32,
30 pub disabled: bool,
31 pub nowort: bool,
32 pub fridge_range_lower: f32,
33 pub fridge_range_upper: f32,
34
35 #[serde(skip_serializing)]
36 pub epoch: String,
37 }
38
39 impl Params {
40 pub fn defaults() -> Params {
41 Params {
42 fridge_setpoint: 16.0,
43 fridge_difference: 0.2,
44 overshoot_delay: 720, // 12 minutes
45 overshoot_factor: 1.0,
46 disabled: false,
47 nowort: false,
48 fridge_range_lower: 3.0,
49 fridge_range_upper: 3.0,
50 epoch: String::new(),
51 }
52 }
53
54 pub fn load(config: &Config) -> Params {
55 // generate random epoch on success.
56 // TODO: return failure? or just return default() ?
57 unimplemented!();
58 }
59 }
60
61 #[derive(Deserialize, Debug)]
62 struct Response {
63 epoch_tag: String,
64 params: Params,
65 }
66
67 #[derive(Clone)]
68 pub struct ParamWaiter {
69 limitlog: NotTooOften,
70 epoch_tag: RefCell<String>,
71 session: RefCell<Option<Session>>,
72
73 config: Config,
74 handle: Handle,
75 }
76
77 const LOG_MINUTES: u64 = 15;
78 const MAX_RESPONSE_SIZE: usize = 10000;
79 const TIMEOUT_MINUTES: u64 = 5;
80
81 impl ParamWaiter {
82 fn make_request(&self) -> Easy {
83 let mut req = Easy::new();
84 req.get(true).unwrap();
85 // supposedly req.url won't fail, checking is later?
86 req.url(&self.config.SETTINGS_URL).unwrap();
87
88 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
89
90 // http header
91 // etag: epoch-tag
92 let e = self.epoch_tag.borrow();
93 if !e.is_empty() {
94 let mut list = easy::List::new();
95 let hd = format!("etag: {}", *e);
96 list.append(&hd).unwrap();
97 req.http_headers(list).unwrap();
98 }
99
100 req
101 }
102
103 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
104 let text = String::from_utf8_lossy(buf).to_string();
105 let resp = req.response_code()?;
106 match resp {
107 200 => {
108 // new params
109 let r: Response = serde_json::from_str(&text)?;
110 *self.epoch_tag.borrow_mut() = r.epoch_tag;
111 Ok(Some(r.params))
112 }
113 304 => Ok(None), // unmodified (long polling timeout at the server)
114 _ => {
115 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
116 },
117 }
118 }
119
120 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
121
122 if rself.session.borrow().is_none() {
123 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
124 }
125 let ses = rself.session.borrow().clone().unwrap();
126
127 let mut req = rself.make_request();
128 let buf = Arc::new(Mutex::new(Vec::new()));
129
130 let dst = buf.clone();
131 req.write_function(move |data| {
132 let mut dst = dst.lock().unwrap();
133 dst.extend_from_slice(data);
134 if dst.len() > MAX_RESPONSE_SIZE {
135 debug!("Too large params response from server: {}", dst.len());
136 Ok(0)
137 } else {
138 Ok(data.len())
139 }
140 }).unwrap();
141
142 let s = ses.perform(req)
143 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
144 .and_then(move |mut rq| {
145 let result = buf.lock().unwrap();
146 rself.handle_response(&mut rq, &result)
147 });
148 Box::new(s)
149
150 /*
151 let mut p = Params::defaults();
152 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
153 future::ok(Some(p)).boxed()
154 */
155 }
156
157 fn new(config: &Config, epoch: String, handle: &Handle) -> Self {
158 ParamWaiter {
159 limitlog: NotTooOften::new(LOG_MINUTES*60),
160 epoch_tag: RefCell::new(epoch),
161 session: RefCell::new(None),
162 config: config.clone(),
163 handle: handle.clone(),
164 }
165 }
166
167 pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
168 let rcself = Rc::new(ParamWaiter::new(config, epoch, handle));
169
170 let dur = Duration::from_millis(4000);
171 let i = Interval::new(dur, &rcself.handle).unwrap()
172 .map_err(|e| TemplogError::new_io("interval failed", e))
173 .and_then(move |()| {
174 Self::step(rcself.clone())
175 })
176 // throw away None params
177 .filter_map(|p| p);
178 Box::new(i)
179 }
180 }
181