Mercurial > templog
comparison rust/src/params.rs @ 618:2d65a9f0bed3 rust
params returning stream of success
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 21 Mar 2017 22:35:58 +0800 |
parents | a85c0c9bc1fa |
children | 8136a6b99866 |
comparison
equal
deleted
inserted
replaced
617:87a78343140e | 618:2d65a9f0bed3 |
---|---|
116 } | 116 } |
117 | 117 |
118 (req, buf) | 118 (req, buf) |
119 } | 119 } |
120 | 120 |
121 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { | 121 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Params, TemplogError> { |
122 let text = String::from_utf8_lossy(buf).to_string(); | 122 let text = String::from_utf8_lossy(buf).to_string(); |
123 let resp = req.response_code()?; | 123 let resp = req.response_code()?; |
124 match resp { | 124 match resp { |
125 200 => { | 125 200 => { |
126 // new params | 126 // new params |
127 let r: Response = serde_json::from_str(&text)?; | 127 let r: Response = serde_json::from_str(&text)?; |
128 *self.last_etag.borrow_mut() = r.etag; | 128 let mut le = self.last_etag.borrow_mut(); |
129 Ok(Some(r.params)) | 129 *le = r.etag; |
130 } | 130 if le.split('-').next().unwrap() == &self.epoch { |
131 304 => Ok(None), // unmodified (long polling timeout at the server) | 131 Ok(r.params) |
132 } else { | |
133 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) | |
134 } | |
135 } | |
136 304 => { | |
137 // XXX this isn't really an error | |
138 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) | |
139 }, | |
132 _ => { | 140 _ => { |
133 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) | 141 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) |
134 }, | 142 }, |
135 } | 143 } |
136 } | 144 } |
137 | 145 |
138 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { | 146 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { |
139 | 147 |
140 if rself.session.borrow().is_none() { | 148 if rself.session.borrow().is_none() { |
141 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); | 149 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); |
142 } | 150 } |
143 | 151 |
149 .and_then(move |mut rq| { | 157 .and_then(move |mut rq| { |
150 let result = buf.lock().unwrap(); | 158 let result = buf.lock().unwrap(); |
151 rself.handle_response(&mut rq, &result) | 159 rself.handle_response(&mut rq, &result) |
152 }); | 160 }); |
153 Box::new(s) | 161 Box::new(s) |
154 | |
155 /* | |
156 let mut p = Params::defaults(); | |
157 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); | |
158 future::ok(Some(p)).boxed() | |
159 */ | |
160 } | 162 } |
161 | 163 |
162 fn new(config: &Config, handle: &Handle) -> Self { | 164 fn new(config: &Config, handle: &Handle) -> Self { |
163 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 | 165 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 |
164 rand::OsRng::new().unwrap().fill_bytes(&mut b); | 166 rand::OsRng::new().unwrap().fill_bytes(&mut b); |
179 let dur = Duration::from_millis(4000); | 181 let dur = Duration::from_millis(4000); |
180 let i = Interval::new(dur, &rcself.handle).unwrap() | 182 let i = Interval::new(dur, &rcself.handle).unwrap() |
181 .map_err(|e| TemplogError::new_io("interval failed", e)) | 183 .map_err(|e| TemplogError::new_io("interval failed", e)) |
182 .and_then(move |()| { | 184 .and_then(move |()| { |
183 Self::step(rcself.clone()) | 185 Self::step(rcself.clone()) |
184 }) | 186 }); |
185 // throw away None params | 187 |
186 .filter_map(|p| p); | 188 // TODO use consume_errors() instead once "impl trait" is stable |
187 Box::new(i) | 189 //Box::new(consume_errors(i)) |
188 } | 190 let f = i.then(|r| { |
189 } | 191 match r { |
190 | 192 Ok(v) => Ok(Some(v)), |
193 Err(e) => { | |
194 debug!("Params stream error: {}", e); | |
195 Ok(None) | |
196 } | |
197 } | |
198 }) | |
199 .filter_map(|p| p); | |
200 Box::new(f) | |
201 } | |
202 } | |
203 |