comparison rust/src/params.rs @ 626:efcbe0d3afd6 rust

fix to work with hyper
author Matt Johnston <matt@ucc.asn.au>
date Wed, 06 Dec 2017 00:09:45 +0800
parents 2710649ab71e
children d5075136442f
comparison
equal deleted inserted replaced
625:8152ef251dbb 626:efcbe0d3afd6
15 use std::fs::File; 15 use std::fs::File;
16 use std::io::Read; 16 use std::io::Read;
17 17
18 use tokio_core::reactor::Interval; 18 use tokio_core::reactor::Interval;
19 use tokio_core::reactor::Handle; 19 use tokio_core::reactor::Handle;
20 use tokio_curl::Session;
21 use futures::{Stream,Future,future}; 20 use futures::{Stream,Future,future};
22 use curl::easy::Easy;
23 use curl::easy;
24 use self::rand::Rng; 21 use self::rand::Rng;
22 use std::str::FromStr;
23 use hyper;
24 use hyper::header::{Headers, ETag, EntityTag};
25 use hyper::client::Client;
25 26
26 use types::*; 27 use types::*;
27 use ::Config; 28 use ::Config;
28 29
29 #[derive(Deserialize, Serialize, Debug, Clone)] 30 #[derive(Deserialize, Serialize, Debug, Clone)]
75 #[derive(Clone)] 76 #[derive(Clone)]
76 pub struct ParamWaiter { 77 pub struct ParamWaiter {
77 limitlog: NotTooOften, 78 limitlog: NotTooOften,
78 // last_etag is used for long-polling. 79 // last_etag is used for long-polling.
79 last_etag: RefCell<String>, 80 last_etag: RefCell<String>,
80 session: RefCell<Option<Session>>,
81 epoch: String, 81 epoch: String,
82 82
83 config: Config, 83 config: Config,
84 handle: Handle, 84 handle: Handle,
85 } 85 }
87 const LOG_MINUTES: u64 = 15; 87 const LOG_MINUTES: u64 = 15;
88 const MAX_RESPONSE_SIZE: usize = 10000; 88 const MAX_RESPONSE_SIZE: usize = 10000;
89 const TIMEOUT_MINUTES: u64 = 5; 89 const TIMEOUT_MINUTES: u64 = 5;
90 90
91 impl ParamWaiter { 91 impl ParamWaiter {
92 fn make_request(&self) -> (Easy, Arc<Mutex<Vec<u8>>>) { 92 fn make_request(&self) -> hyper::client::FutureResponse {
93 let mut req = Easy::new(); 93 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
94 req.get(true).unwrap(); 94 let mut req = hyper::client::Request::new(hyper::Method::Get, uri);
95 // supposedly req.url won't fail, checking happens later? 95 {
96 req.url(&self.config.SETTINGS_URL).unwrap(); 96 let mut headers = req.headers_mut();
97 97 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone())));
98 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
99
100 // store response
101 let max_response_size = 10000;
102 let buf = Arc::new(Mutex::new(Vec::new()));
103 let dst = buf.clone();
104 req.write_function(move |data| {
105 let mut dst = dst.lock().unwrap();
106 dst.extend_from_slice(data);
107 if dst.len() > max_response_size {
108 error!("Too large params response from server: {}", dst.len());
109 Ok(0)
110 } else {
111 Ok(data.len())
112 }
113 }).unwrap();
114
115 // http header
116 let e = self.last_etag.borrow();
117 if !e.is_empty() {
118 let mut list = easy::List::new();
119 let hd = format!("etag: {}", *e);
120 list.append(&hd).unwrap();
121 req.http_headers(list).unwrap();
122 } 98 }
123 99 hyper::client::Client::new(&self.handle).request(req)
124 (req, buf) 100 // XXX how do we do this?
101 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
125 } 102 }
126 103
127 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Params, TemplogError> { 104
128 let text = String::from_utf8_lossy(buf).to_string(); 105 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> {
129 let resp = req.response_code()?; 106 let text = String::from_utf8_lossy(buf.as_ref());
130 match resp { 107 match status {
131 200 => { 108 hyper::StatusCode::Ok => {
132 // new params 109 // new params
133 let r: Response = serde_json::from_str(&text)?; 110 let r: Response = serde_json::from_str(&text)?;
134 let mut le = self.last_etag.borrow_mut(); 111 let mut le = self.last_etag.borrow_mut();
135 *le = r.etag; 112 *le = r.etag;
136 if le.split('-').next().unwrap() == &self.epoch { 113
137 self.write_params(&r.params); 114 // update params if the epoch is correct
138 Ok(r.params) 115 if let Some(e) = le.split('-').next() {
139 } else { 116 if e == &self.epoch {
140 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) 117 self.write_params(&r.params);
118 return Ok(r.params);
119 }
141 } 120 }
121 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch)))
142 } 122 }
143 304 => { 123 hyper::StatusCode::NotModified => {
144 // XXX this isn't really an error 124 // XXX this isn't really an error
145 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) 125 Err(TemplogError::new("304 unmodified (long polling timeout at the server)"))
146 }, 126 },
147 _ => { 127 _ => {
148 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) 128 Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text)))
149 }, 129 },
150 } 130 }
151 } 131 }
152 132
153 fn write_params(&self, params: &Params) { 133 fn write_params(&self, params: &Params) {
156 serde_json::to_writer(f, params) 136 serde_json::to_writer(f, params)
157 }); 137 });
158 } 138 }
159 139
160 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> { 140 fn step(rself: Rc<Self>) -> Box<Future<Item=Params, Error=TemplogError>> {
141 let resp = rself.make_request();
161 142
162 if rself.session.borrow().is_none() { 143 let s = resp.map_err(|e| TemplogError::new_hyper("response", e))
163 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone())); 144 .and_then(move |r| {
164 } 145 let status = r.status();
165 146 r.body().concat2()
166 let (mut req, buf) = rself.make_request(); 147 .map_err(|e| TemplogError::new_hyper("body", e))
167 148 .and_then(move |b| {
168 let ses = rself.session.borrow().clone().unwrap(); 149 rself.handle_response(b, status)
169 let s = ses.perform(req) 150 })
170 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) 151 });
171 .and_then(move |mut rq| {
172 let result = buf.lock().unwrap();
173 rself.handle_response(&mut rq, &result)
174 });
175 Box::new(s) 152 Box::new(s)
176 } 153 }
177 154
178 fn new(config: &Config, handle: &Handle) -> Self { 155 fn new(config: &Config, handle: &Handle) -> Self {
179 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 156 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
180 rand::OsRng::new().unwrap().fill_bytes(&mut b); 157 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b);
181 let epoch = base64::encode(&b); 158 let epoch = base64::encode(&b);
182 ParamWaiter { 159 ParamWaiter {
183 limitlog: NotTooOften::new(LOG_MINUTES*60), 160 limitlog: NotTooOften::new(LOG_MINUTES*60),
184 last_etag: RefCell::new(String::new()), 161 last_etag: RefCell::new(String::new()),
185 session: RefCell::new(None),
186 epoch: epoch, 162 epoch: epoch,
187 config: config.clone(), 163 config: config.clone(),
188 handle: handle.clone(), 164 handle: handle.clone(),
189 } 165 }
190 } 166 }