Mercurial > templog
comparison rust/src/paramwaiter.rs @ 611:f3e39e2107fd rust
still doesn't compile, improvements to TemplogError and tokio curl though
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 28 Feb 2017 22:58:47 +0800 |
parents | 7bda01659426 |
children | 5c2b0d47bb83 |
comparison
equal
deleted
inserted
replaced
610:af0dac00d40b | 611:f3e39e2107fd |
---|---|
1 extern crate tokio_core; | 1 extern crate tokio_core; |
2 extern crate futures; | 2 extern crate futures; |
3 extern crate rand; | 3 extern crate rand; |
4 extern crate serde_json; | |
4 | 5 |
5 use std::time::Duration; | 6 use std::time::Duration; |
6 use std::io; | 7 use std::io; |
8 use std::str; | |
9 use std::rc::Rc; | |
10 use std::sync::{Arc,Mutex}; | |
11 use std::error::Error; | |
7 | 12 |
8 use tokio_core::reactor::Interval; | 13 use tokio_core::reactor::Interval; |
9 use tokio_core::reactor::Handle; | 14 use tokio_core::reactor::Handle; |
10 use tokio_curl::Session; | 15 use tokio_curl::Session; |
11 use futures::{Stream,Future,future}; | 16 use futures::{Stream,Future,future}; |
17 use curl::easy::Easy; | |
18 | |
12 use types::*; | 19 use types::*; |
13 use curl::Easy; | |
14 use ::Config; | 20 use ::Config; |
15 | 21 |
22 #[derive(Clone)] | |
16 pub struct ParamWaiter { | 23 pub struct ParamWaiter { |
17 limitlog: NotTooOften, | 24 limitlog: NotTooOften, |
18 epoch_tag: String, | 25 epoch_tag: String, |
19 session: Option<Session>, | 26 session: Option<Session>, |
20 config: Config, | 27 config: Config, |
28 handle: Handle, | |
21 } | 29 } |
22 | 30 |
23 const LOGMINUTES: u64 = 15; | 31 const LOG_MINUTES: u64 = 15; |
32 const MAX_RESPONSE_SIZE: usize = 10000; | |
24 | 33 |
25 impl ParamWaiter { | 34 impl ParamWaiter { |
26 pub fn new(config: &Config) -> Self { | 35 pub fn new(config: &Config, handle: &Handle) -> Self { |
27 ParamWaiter { | 36 ParamWaiter { |
28 limitlog: NotTooOften::new(LOGMINUTES*60), | 37 limitlog: NotTooOften::new(LOG_MINUTES*60), |
29 epoch_tag: String::new(), | 38 epoch_tag: String::new(), |
30 session: None, | 39 session: None, |
31 config: config.clone(), | 40 config: config.clone(), |
41 handle: handle.clone(), | |
32 } | 42 } |
33 } | 43 } |
34 | 44 |
35 fn make_req(&self) -> Easy { | 45 fn make_req(&self) -> Easy { |
36 let mut req = Easy::new(); | 46 let mut req = Easy::new(); |
37 req.get(true).unwrap(); | 47 req.get(true).unwrap(); |
38 req.url(config.SETTINGS_URL); | 48 // supposedly req.url won't fail, checking is later? |
49 req.url(&self.config.SETTINGS_URL).unwrap(); | |
50 req | |
39 } | 51 } |
40 | 52 |
41 fn step(&mut self, handle: &Handle) -> Box<Future<Item=Option<Params>, Error=io::Error>> { | 53 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { |
54 let text = String::from_utf8_lossy(buf).to_string(); | |
55 let resp = req.response_code()?; | |
56 match resp { | |
57 200 => Ok(Some(serde_json::from_str(&text)?)), // new params | |
58 304 => Ok(None), // unmodified (long polling timeout at the server) | |
59 _ => { | |
60 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) | |
61 }, | |
62 } | |
63 } | |
64 | |
65 fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { | |
42 if self.session.is_none() { | 66 if self.session.is_none() { |
43 self.session = Some(Session::new(handle.clone())) | 67 self.session = Some(Session::new(self.handle.clone())) |
44 } | 68 } |
45 | 69 |
46 let req = self.make_req(); | 70 let mut req = self.make_req(); |
47 /* | 71 let buf = Arc::new(Mutex::new(Vec::new())); |
48 self.session.unwrap().perform(self.make_req()) | |
49 .and_then(||) | |
50 */ | |
51 | 72 |
73 let dst = buf.clone(); | |
74 req.write_function(move |data| { | |
75 let mut dst = dst.lock().unwrap(); | |
76 dst.extend_from_slice(data); | |
77 if dst.len() > MAX_RESPONSE_SIZE { | |
78 debug!("Too large params response from server: {}", dst.len()); | |
79 Ok(0) | |
80 } else { | |
81 Ok(data.len()) | |
82 } | |
83 }).unwrap(); | |
84 | |
85 // XXX too many clones | |
86 let se = self.clone(); | |
87 let s = self.clone().session.unwrap().perform(req) | |
88 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) | |
89 .and_then(move |mut es| { | |
90 let result = buf.lock().unwrap(); | |
91 se.handle_response(&mut es, &result) | |
92 }); | |
93 Box::new(s) | |
94 | |
95 /* | |
52 let mut p = Params::defaults(); | 96 let mut p = Params::defaults(); |
53 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); | 97 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); |
54 future::ok(p).boxed() | 98 future::ok(Some(p)).boxed() |
99 */ | |
55 } | 100 } |
56 | 101 |
57 pub fn stream(&mut self, handle: &Handle) -> Box<Stream<Item=Params, Error=io::Error>> { | 102 pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> { |
58 | 103 |
59 let dur = Duration::from_millis(4000); | 104 let dur = Duration::from_millis(4000); |
60 let i = Interval::new(dur, handle).unwrap() | 105 let mut s = Rc::new(self.clone()); |
106 let i = Interval::new(dur, &self.handle).unwrap() | |
107 .map_err(|e| TemplogError::new_io("interval failed", e)) | |
61 .and_then(move |()| { | 108 .and_then(move |()| { |
62 s.step() | 109 let ss = Rc::get_mut(&mut s).unwrap(); |
110 ss.step() | |
63 }) | 111 }) |
64 // throw away None params | 112 // throw away None params |
65 .filter_map(|p| p); | 113 .filter_map(|p| p); |
66 Box::new(i) | 114 Box::new(i) |
67 } | 115 } |