Mercurial > templog
comparison rust/src/paramwaiter.rs @ 614:e1bab5b36352 rust
using some refcells for the paramwaiter
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 07 Mar 2017 23:04:02 +0800 |
parents | 5c2b0d47bb83 |
children |
comparison
equal
deleted
inserted
replaced
613:5c2b0d47bb83 | 614:e1bab5b36352 |
---|---|
7 use std::io; | 7 use std::io; |
8 use std::str; | 8 use std::str; |
9 use std::rc::Rc; | 9 use std::rc::Rc; |
10 use std::sync::{Arc,Mutex}; | 10 use std::sync::{Arc,Mutex}; |
11 use std::error::Error; | 11 use std::error::Error; |
12 use std::cell::Cell; | 12 use std::cell::{Cell,RefCell}; |
13 | 13 |
14 use tokio_core::reactor::Interval; | 14 use tokio_core::reactor::Interval; |
15 use tokio_core::reactor::Handle; | 15 use tokio_core::reactor::Handle; |
16 use tokio_curl::Session; | 16 use tokio_curl::Session; |
17 use futures::{Stream,Future,future}; | 17 use futures::{Stream,Future,future}; |
18 use curl::easy::Easy; | 18 use curl::easy::Easy; |
19 use curl::easy; | |
19 | 20 |
20 use types::*; | 21 use types::*; |
21 use ::Config; | 22 use ::Config; |
22 | 23 |
23 #[derive(Deserialize, Debug)] | 24 #[derive(Deserialize, Debug)] |
27 } | 28 } |
28 | 29 |
29 #[derive(Clone)] | 30 #[derive(Clone)] |
30 pub struct ParamWaiter { | 31 pub struct ParamWaiter { |
31 limitlog: NotTooOften, | 32 limitlog: NotTooOften, |
32 epoch_tag: String, | 33 epoch_tag: RefCell<String>, |
33 session: Option<Session>, | 34 session: RefCell<Option<Session>>, |
35 | |
34 config: Config, | 36 config: Config, |
35 handle: Handle, | 37 handle: Handle, |
36 } | 38 } |
37 | 39 |
38 const LOG_MINUTES: u64 = 15; | 40 const LOG_MINUTES: u64 = 15; |
39 const MAX_RESPONSE_SIZE: usize = 10000; | 41 const MAX_RESPONSE_SIZE: usize = 10000; |
42 const TIMEOUT_MINUTES: u64 = 5; | |
40 | 43 |
41 impl ParamWaiter { | 44 impl ParamWaiter { |
42 pub fn new(config: &Config, handle: &Handle) -> Self { | 45 fn make_request(&self) -> Easy { |
43 ParamWaiter { | |
44 limitlog: NotTooOften::new(LOG_MINUTES*60), | |
45 epoch_tag: String::new(), | |
46 session: None, | |
47 config: config.clone(), | |
48 handle: handle.clone(), | |
49 } | |
50 } | |
51 | |
52 fn make_req(&self) -> Easy { | |
53 let mut req = Easy::new(); | 46 let mut req = Easy::new(); |
54 req.get(true).unwrap(); | 47 req.get(true).unwrap(); |
55 // supposedly req.url won't fail, checking is later? | 48 // supposedly req.url won't fail, checking is later? |
56 req.url(&self.config.SETTINGS_URL).unwrap(); | 49 req.url(&self.config.SETTINGS_URL).unwrap(); |
50 | |
51 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); | |
52 | |
53 // http header | |
54 // etag: epoch-tag | |
55 let e = self.epoch_tag.borrow(); | |
56 if !e.is_empty() { | |
57 let mut list = easy::List::new(); | |
58 let hd = format!("etag: {}", *e); | |
59 list.append(&hd).unwrap(); | |
60 req.http_headers(list).unwrap(); | |
61 } | |
62 | |
57 req | 63 req |
58 } | 64 } |
59 | 65 |
60 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { | 66 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { |
61 let text = String::from_utf8_lossy(buf).to_string(); | 67 let text = String::from_utf8_lossy(buf).to_string(); |
62 let resp = req.response_code()?; | 68 let resp = req.response_code()?; |
63 match resp { | 69 match resp { |
64 200 => { | 70 200 => { |
65 // new params | 71 // new params |
66 let r: Response = serde_json::from_str(&text)?; | 72 let r: Response = serde_json::from_str(&text)?; |
67 // XXX | 73 *self.epoch_tag.borrow_mut() = r.epoch_tag; |
68 //self.epoch_tag = r.epoch_tag; | |
69 Ok(Some(r.params)) | 74 Ok(Some(r.params)) |
70 } | 75 } |
71 304 => Ok(None), // unmodified (long polling timeout at the server) | 76 304 => Ok(None), // unmodified (long polling timeout at the server) |
72 _ => { | 77 _ => { |
73 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) | 78 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) |
74 }, | 79 }, |
75 } | 80 } |
76 } | 81 } |
77 | 82 |
78 fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { | 83 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { |
79 if self.session.is_none() { | 84 |
80 self.session = Some(Session::new(self.handle.clone())) | 85 if rself.session.borrow().is_none() { |
86 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); | |
81 } | 87 } |
88 let ses = rself.session.borrow().clone().unwrap(); | |
82 | 89 |
83 let mut req = self.make_req(); | 90 let mut req = rself.make_request(); |
84 let buf = Arc::new(Mutex::new(Vec::new())); | 91 let buf = Arc::new(Mutex::new(Vec::new())); |
85 | 92 |
86 let dst = buf.clone(); | 93 let dst = buf.clone(); |
87 req.write_function(move |data| { | 94 req.write_function(move |data| { |
88 let mut dst = dst.lock().unwrap(); | 95 let mut dst = dst.lock().unwrap(); |
93 } else { | 100 } else { |
94 Ok(data.len()) | 101 Ok(data.len()) |
95 } | 102 } |
96 }).unwrap(); | 103 }).unwrap(); |
97 | 104 |
98 // XXX too many clones | 105 let s = ses.perform(req) |
99 let se = self.clone(); | |
100 let s = self.clone().session.unwrap().perform(req) | |
101 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) | 106 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) |
102 .and_then(move |mut es| { | 107 .and_then(move |mut rq| { |
103 let result = buf.lock().unwrap(); | 108 let result = buf.lock().unwrap(); |
104 se.handle_response(&mut es, &result) | 109 rself.handle_response(&mut rq, &result) |
105 }); | 110 }); |
106 Box::new(s) | 111 Box::new(s) |
107 | 112 |
108 /* | 113 /* |
109 let mut p = Params::defaults(); | 114 let mut p = Params::defaults(); |
110 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); | 115 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); |
111 future::ok(Some(p)).boxed() | 116 future::ok(Some(p)).boxed() |
112 */ | 117 */ |
113 } | 118 } |
114 | 119 |
115 pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> { | 120 pub fn new(config: &Config, handle: &Handle) -> Self { |
121 ParamWaiter { | |
122 limitlog: NotTooOften::new(LOG_MINUTES*60), | |
123 epoch_tag: RefCell::new(String::new()), | |
124 session: RefCell::new(None), | |
125 config: config.clone(), | |
126 handle: handle.clone(), | |
127 } | |
128 } | |
129 | |
130 | |
131 pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> { | |
132 let rcself = Rc::new(ParamWaiter::new(config, handle)); | |
116 | 133 |
117 let dur = Duration::from_millis(4000); | 134 let dur = Duration::from_millis(4000); |
118 let mut s = Rc::new(self.clone()); | 135 let i = Interval::new(dur, &rcself.handle).unwrap() |
119 let i = Interval::new(dur, &self.handle).unwrap() | |
120 .map_err(|e| TemplogError::new_io("interval failed", e)) | 136 .map_err(|e| TemplogError::new_io("interval failed", e)) |
121 .and_then(move |()| { | 137 .and_then(move |()| { |
122 let ss = Rc::get_mut(&mut s).unwrap(); | 138 Self::step(rcself.clone()) |
123 ss.step() | |
124 }) | 139 }) |
125 // throw away None params | 140 // throw away None params |
126 .filter_map(|p| p); | 141 .filter_map(|p| p); |
127 Box::new(i) | 142 Box::new(i) |
128 } | 143 } |