comparison rust/src/params.rs @ 616:a85c0c9bc1fa rust

hide epoch in ParamWaiter make_request handles the buffer too
author Matt Johnston <matt@ucc.asn.au>
date Wed, 08 Mar 2017 23:08:14 +0800
parents f153aec221be
children 2d65a9f0bed3
comparison
equal deleted inserted replaced
615:f153aec221be 616:a85c0c9bc1fa
1 extern crate tokio_core; 1 extern crate tokio_core;
2 extern crate futures; 2 extern crate futures;
3 extern crate rand; 3 extern crate rand;
4 extern crate serde_json; 4 extern crate serde_json;
5 extern crate base64;
5 6
6 use std::time::Duration; 7 use std::time::Duration;
7 use std::io; 8 use std::io;
8 use std::str; 9 use std::str;
9 use std::rc::Rc; 10 use std::rc::Rc;
15 use tokio_core::reactor::Handle; 16 use tokio_core::reactor::Handle;
16 use tokio_curl::Session; 17 use tokio_curl::Session;
17 use futures::{Stream,Future,future}; 18 use futures::{Stream,Future,future};
18 use curl::easy::Easy; 19 use curl::easy::Easy;
19 use curl::easy; 20 use curl::easy;
21 use self::rand::Rng;
20 22
21 use types::*; 23 use types::*;
22 use ::Config; 24 use ::Config;
23 25
24 #[derive(Deserialize, Serialize, Debug, Clone)] 26 #[derive(Deserialize, Serialize, Debug, Clone)]
29 pub overshoot_factor: f32, 31 pub overshoot_factor: f32,
30 pub disabled: bool, 32 pub disabled: bool,
31 pub nowort: bool, 33 pub nowort: bool,
32 pub fridge_range_lower: f32, 34 pub fridge_range_lower: f32,
33 pub fridge_range_upper: f32, 35 pub fridge_range_upper: f32,
34
35 #[serde(skip_serializing)]
36 pub epoch: String,
37 } 36 }
38 37
39 impl Params { 38 impl Params {
40 pub fn defaults() -> Params { 39 pub fn defaults() -> Params {
41 Params { 40 Params {
45 overshoot_factor: 1.0, 44 overshoot_factor: 1.0,
46 disabled: false, 45 disabled: false,
47 nowort: false, 46 nowort: false,
48 fridge_range_lower: 3.0, 47 fridge_range_lower: 3.0,
49 fridge_range_upper: 3.0, 48 fridge_range_upper: 3.0,
50 epoch: String::new(),
51 } 49 }
52 } 50 }
53 51
54 pub fn load(config: &Config) -> Params { 52 pub fn load(config: &Config) -> Params {
55 // generate random epoch on success. 53 // generate random epoch on success.
56 // TODO: return failure? or just return default() ? 54 // TODO: return failure? or just return default() ?
55 let mut p = Params::defaults();
56
57 unimplemented!(); 57 unimplemented!();
58 } 58 }
59 } 59 }
60 60
61 #[derive(Deserialize, Debug)] 61 #[derive(Deserialize, Debug)]
62 struct Response { 62 struct Response {
63 epoch_tag: String, 63 // sent as an opaque etag: header. Has format "epoch-nonce",
64 // responses where the epoch do not match ParamWaiter::epoch are dropped
65 etag: String,
64 params: Params, 66 params: Params,
65 } 67 }
66 68
67 #[derive(Clone)] 69 #[derive(Clone)]
68 pub struct ParamWaiter { 70 pub struct ParamWaiter {
69 limitlog: NotTooOften, 71 limitlog: NotTooOften,
70 epoch_tag: RefCell<String>, 72 // last_etag is used for long-polling.
73 last_etag: RefCell<String>,
71 session: RefCell<Option<Session>>, 74 session: RefCell<Option<Session>>,
75 epoch: String,
72 76
73 config: Config, 77 config: Config,
74 handle: Handle, 78 handle: Handle,
75 } 79 }
76 80
77 const LOG_MINUTES: u64 = 15; 81 const LOG_MINUTES: u64 = 15;
78 const MAX_RESPONSE_SIZE: usize = 10000; 82 const MAX_RESPONSE_SIZE: usize = 10000;
79 const TIMEOUT_MINUTES: u64 = 5; 83 const TIMEOUT_MINUTES: u64 = 5;
80 84
81 impl ParamWaiter { 85 impl ParamWaiter {
82 fn make_request(&self) -> Easy { 86 fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) {
83 let mut req = Easy::new(); 87 let mut req = Easy::new();
84 req.get(true).unwrap(); 88 req.get(true).unwrap();
85 // supposedly req.url won't fail, checking is later? 89 // supposedly req.url won't fail, checking happens later?
86 req.url(&self.config.SETTINGS_URL).unwrap(); 90 req.url(&self.config.SETTINGS_URL).unwrap();
87 91
88 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); 92 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
89 93
94 // store response
95 let max_response_size = 10000;
96 let buf = Arc::new(Mutex::new(Vec::new()));
97 let dst = buf.clone();
98 req.write_function(move |data| {
99 let mut dst = dst.lock().unwrap();
100 dst.extend_from_slice(data);
101 if dst.len() > max_response_size {
102 error!("Too large params response from server: {}", dst.len());
103 Ok(0)
104 } else {
105 Ok(data.len())
106 }
107 }).unwrap();
108
90 // http header 109 // http header
91 // etag: epoch-tag 110 let e = self.last_etag.borrow();
92 let e = self.epoch_tag.borrow();
93 if !e.is_empty() { 111 if !e.is_empty() {
94 let mut list = easy::List::new(); 112 let mut list = easy::List::new();
95 let hd = format!("etag: {}", *e); 113 let hd = format!("etag: {}", *e);
96 list.append(&hd).unwrap(); 114 list.append(&hd).unwrap();
97 req.http_headers(list).unwrap(); 115 req.http_headers(list).unwrap();
98 } 116 }
99 117
100 req 118 (req, buf)
101 } 119 }
102 120
103 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { 121 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
104 let text = String::from_utf8_lossy(buf).to_string(); 122 let text = String::from_utf8_lossy(buf).to_string();
105 let resp = req.response_code()?; 123 let resp = req.response_code()?;
106 match resp { 124 match resp {
107 200 => { 125 200 => {
108 // new params 126 // new params
109 let r: Response = serde_json::from_str(&text)?; 127 let r: Response = serde_json::from_str(&text)?;
110 *self.epoch_tag.borrow_mut() = r.epoch_tag; 128 *self.last_etag.borrow_mut() = r.etag;
111 Ok(Some(r.params)) 129 Ok(Some(r.params))
112 } 130 }
113 304 => Ok(None), // unmodified (long polling timeout at the server) 131 304 => Ok(None), // unmodified (long polling timeout at the server)
114 _ => { 132 _ => {
115 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) 133 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
120 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { 138 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
121 139
122 if rself.session.borrow().is_none() { 140 if rself.session.borrow().is_none() {
123 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); 141 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
124 } 142 }
143
144 let (mut req, buf) = rself.make_request();
145
125 let ses = rself.session.borrow().clone().unwrap(); 146 let ses = rself.session.borrow().clone().unwrap();
126
127 let mut req = rself.make_request();
128 let buf = Arc::new(Mutex::new(Vec::new()));
129
130 let dst = buf.clone();
131 req.write_function(move |data| {
132 let mut dst = dst.lock().unwrap();
133 dst.extend_from_slice(data);
134 if dst.len() > MAX_RESPONSE_SIZE {
135 debug!("Too large params response from server: {}", dst.len());
136 Ok(0)
137 } else {
138 Ok(data.len())
139 }
140 }).unwrap();
141
142 let s = ses.perform(req) 147 let s = ses.perform(req)
143 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) 148 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
144 .and_then(move |mut rq| { 149 .and_then(move |mut rq| {
145 let result = buf.lock().unwrap(); 150 let result = buf.lock().unwrap();
146 rself.handle_response(&mut rq, &result) 151 rself.handle_response(&mut rq, &result)
152 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); 157 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
153 future::ok(Some(p)).boxed() 158 future::ok(Some(p)).boxed()
154 */ 159 */
155 } 160 }
156 161
157 fn new(config: &Config, epoch: String, handle: &Handle) -> Self { 162 fn new(config: &Config, handle: &Handle) -> Self {
163 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
164 rand::OsRng::new().unwrap().fill_bytes(&mut b);
165 let epoch = base64::encode(&b);
158 ParamWaiter { 166 ParamWaiter {
159 limitlog: NotTooOften::new(LOG_MINUTES*60), 167 limitlog: NotTooOften::new(LOG_MINUTES*60),
160 epoch_tag: RefCell::new(epoch), 168 last_etag: RefCell::new(String::new()),
161 session: RefCell::new(None), 169 session: RefCell::new(None),
170 epoch: epoch,
162 config: config.clone(), 171 config: config.clone(),
163 handle: handle.clone(), 172 handle: handle.clone(),
164 } 173 }
165 } 174 }
166 175
167 pub fn stream(config: &Config, epoch: String, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { 176 pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
168 let rcself = Rc::new(ParamWaiter::new(config, epoch, handle)); 177 let rcself = Rc::new(ParamWaiter::new(config, handle));
169 178
170 let dur = Duration::from_millis(4000); 179 let dur = Duration::from_millis(4000);
171 let i = Interval::new(dur, &rcself.handle).unwrap() 180 let i = Interval::new(dur, &rcself.handle).unwrap()
172 .map_err(|e| TemplogError::new_io("interval failed", e)) 181 .map_err(|e| TemplogError::new_io("interval failed", e))
173 .and_then(move |()| { 182 .and_then(move |()| {