Mercurial > templog
comparison rust/src/params.rs @ 626:efcbe0d3afd6 rust
fix to work with hyper
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Wed, 06 Dec 2017 00:09:45 +0800 |
parents | 2710649ab71e |
children | d5075136442f |
comparison
equal
deleted
inserted
replaced
625:8152ef251dbb | 626:efcbe0d3afd6 |
---|---|
15 use std::fs::File; | 15 use std::fs::File; |
16 use std::io::Read; | 16 use std::io::Read; |
17 | 17 |
18 use tokio_core::reactor::Interval; | 18 use tokio_core::reactor::Interval; |
19 use tokio_core::reactor::Handle; | 19 use tokio_core::reactor::Handle; |
20 use tokio_curl::Session; | |
21 use futures::{Stream,Future,future}; | 20 use futures::{Stream,Future,future}; |
22 use curl::easy::Easy; | |
23 use curl::easy; | |
24 use self::rand::Rng; | 21 use self::rand::Rng; |
22 use std::str::FromStr; | |
23 use hyper; | |
24 use hyper::header::{Headers, ETag, EntityTag}; | |
25 use hyper::client::Client; | |
25 | 26 |
26 use types::*; | 27 use types::*; |
27 use ::Config; | 28 use ::Config; |
28 | 29 |
29 #[derive(Deserialize, Serialize, Debug, Clone)] | 30 #[derive(Deserialize, Serialize, Debug, Clone)] |
75 #[derive(Clone)] | 76 #[derive(Clone)] |
76 pub struct ParamWaiter { | 77 pub struct ParamWaiter { |
77 limitlog: NotTooOften, | 78 limitlog: NotTooOften, |
78 // last_etag is used for long-polling. | 79 // last_etag is used for long-polling. |
79 last_etag: RefCell<String>, | 80 last_etag: RefCell<String>, |
80 session: RefCell<Option<Session>>, | |
81 epoch: String, | 81 epoch: String, |
82 | 82 |
83 config: Config, | 83 config: Config, |
84 handle: Handle, | 84 handle: Handle, |
85 } | 85 } |
87 const LOG_MINUTES: u64 = 15; | 87 const LOG_MINUTES: u64 = 15; |
88 const MAX_RESPONSE_SIZE: usize = 10000; | 88 const MAX_RESPONSE_SIZE: usize = 10000; |
89 const TIMEOUT_MINUTES: u64 = 5; | 89 const TIMEOUT_MINUTES: u64 = 5; |
90 | 90 |
91 impl ParamWaiter { | 91 impl ParamWaiter { |
92 fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) { | 92 fn make_request(&self) -> hyper::client::FutureResponse { |
93 let mut req = Easy::new(); | 93 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); |
94 req.get(true).unwrap(); | 94 let mut req = hyper::client::Request::new(hyper::Method::Get, uri); |
95 // supposedly req.url won't fail, checking happens later? | 95 { |
96 req.url(&self.config.SETTINGS_URL).unwrap(); | 96 let mut headers = req.headers_mut(); |
97 | 97 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); |
98 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); | |
99 | |
100 // store response | |
101 let max_response_size = 10000; | |
102 let buf = Arc::new(Mutex::new(Vec::new())); | |
103 let dst = buf.clone(); | |
104 req.write_function(move |data| { | |
105 let mut dst = dst.lock().unwrap(); | |
106 dst.extend_from_slice(data); | |
107 if dst.len() > max_response_size { | |
108 error!("Too large params response from server: {}", dst.len()); | |
109 Ok(0) | |
110 } else { | |
111 Ok(data.len()) | |
112 } | |
113 }).unwrap(); | |
114 | |
115 // http header | |
116 let e = self.last_etag.borrow(); | |
117 if !e.is_empty() { | |
118 let mut list = easy::List::new(); | |
119 let hd = format!("etag: {}", *e); | |
120 list.append(&hd).unwrap(); | |
121 req.http_headers(list).unwrap(); | |
122 } | 98 } |
123 | 99 hyper::client::Client::new(&self.handle).request(req) |
124 (req, buf) | 100 // XXX how do we do this? |
101 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap(); | |
125 } | 102 } |
126 | 103 |
127 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Params, TemplogError> { | 104 |
128 let text = String::from_utf8_lossy(buf).to_string(); | 105 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { |
129 let resp = req.response_code()?; | 106 let text = String::from_utf8_lossy(buf.as_ref()); |
130 match resp { | 107 match status { |
131 200 => { | 108 hyper::StatusCode::Ok => { |
132 // new params | 109 // new params |
133 let r: Response = serde_json::from_str(&text)?; | 110 let r: Response = serde_json::from_str(&text)?; |
134 let mut le = self.last_etag.borrow_mut(); | 111 let mut le = self.last_etag.borrow_mut(); |
135 *le = r.etag; | 112 *le = r.etag; |
136 if le.split('-').next().unwrap() == &self.epoch { | 113 |
137 self.write_params(&r.params); | 114 // update params if the epoch is correct |
138 Ok(r.params) | 115 if let Some(e) = le.split('-').next() { |
139 } else { | 116 if e == &self.epoch { |
140 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) | 117 self.write_params(&r.params); |
118 return Ok(r.params); | |
119 } | |
141 } | 120 } |
121 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) | |
142 } | 122 } |
143 304 => { | 123 hyper::StatusCode::NotModified => { |
144 // XXX this isn't really an error | 124 // XXX this isn't really an error |
145 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) | 125 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) |
146 }, | 126 }, |
147 _ => { | 127 _ => { |
148 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) | 128 Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text))) |
149 }, | 129 }, |
150 } | 130 } |
151 } | 131 } |
152 | 132 |
153 fn write_params(&self, params: &Params) { | 133 fn write_params(&self, params: &Params) { |
156 serde_json::to_writer(f, params) | 136 serde_json::to_writer(f, params) |
157 }); | 137 }); |
158 } | 138 } |
159 | 139 |
160 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { | 140 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { |
141 let resp = rself.make_request(); | |
161 | 142 |
162 if rself.session.borrow().is_none() { | 143 let s = resp.map_err(|e| TemplogError::new_hyper("response", e)) |
163 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); | 144 .and_then(move |r| { |
164 } | 145 let status = r.status(); |
165 | 146 r.body().concat2() |
166 let (mut req, buf) = rself.make_request(); | 147 .map_err(|e| TemplogError::new_hyper("body", e)) |
167 | 148 .and_then(move |b| { |
168 let ses = rself.session.borrow().clone().unwrap(); | 149 rself.handle_response(b, status) |
169 let s = ses.perform(req) | 150 }) |
170 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) | 151 }); |
171 .and_then(move |mut rq| { | |
172 let result = buf.lock().unwrap(); | |
173 rself.handle_response(&mut rq, &result) | |
174 }); | |
175 Box::new(s) | 152 Box::new(s) |
176 } | 153 } |
177 | 154 |
178 fn new(config: &Config, handle: &Handle) -> Self { | 155 fn new(config: &Config, handle: &Handle) -> Self { |
179 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 | 156 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 |
180 rand::OsRng::new().unwrap().fill_bytes(&mut b); | 157 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b); |
181 let epoch = base64::encode(&b); | 158 let epoch = base64::encode(&b); |
182 ParamWaiter { | 159 ParamWaiter { |
183 limitlog: NotTooOften::new(LOG_MINUTES*60), | 160 limitlog: NotTooOften::new(LOG_MINUTES*60), |
184 last_etag: RefCell::new(String::new()), | 161 last_etag: RefCell::new(String::new()), |
185 session: RefCell::new(None), | |
186 epoch: epoch, | 162 epoch: epoch, |
187 config: config.clone(), | 163 config: config.clone(), |
188 handle: handle.clone(), | 164 handle: handle.clone(), |
189 } | 165 } |
190 } | 166 } |