comparison rust/src/params.rs @ 639:89818a14648b rust tip

- switch to using anyhow for errors, surf for http runs but surf has problems
author Matt Johnston <matt@ucc.asn.au>
date Thu, 28 Nov 2019 23:57:00 +0800
parents a9f353f488d0
children
comparison
equal deleted inserted replaced
638:a9f353f488d0 639:89818a14648b
1 use anyhow::{Result,bail};
2
1 use std::time::Duration; 3 use std::time::Duration;
2 4
3 use std::str; 5 use std::str;
4
5
6 6
7 use std::cell::{RefCell}; 7 use std::cell::{RefCell};
8 use std::fs::File; 8 use std::fs::File;
9 use std::io::Read; 9 use std::io::Read;
10 10
11 use serde::{Serialize,Deserialize}; 11 use serde::{Serialize,Deserialize};
12 12
13 use rand::rngs::{OsRng}; 13 use rand::rngs::{OsRng};
14 use rand::{RngCore}; 14 use rand::{RngCore};
15 15
16 16 use async_std::task;
17 use hyper; 17
18 18 use surf;
19 // for try_concat()
20 use futures::stream::TryStreamExt;
21 use futures::executor::block_on;
22 19
23 use riker::actors::*; 20 use riker::actors::*;
24 21
25 use super::types::*; 22 use super::types::*;
26 use super::config::Config; 23 use super::config::Config;
49 fridge_range_lower: 3.0, 46 fridge_range_lower: 3.0,
50 fridge_range_upper: 3.0, 47 fridge_range_upper: 3.0,
51 } 48 }
52 } 49 }
53 50
54 fn try_load(filename: &str) -> Result<Params, TemplogError> { 51 fn try_load(filename: &str) -> Result<Params> {
55 let mut s = String::new(); 52 let mut s = String::new();
56 File::open(filename)?.read_to_string(&mut s)?; 53 File::open(filename)?.read_to_string(&mut s)?;
57 Ok(serde_json::from_str(&s)?) 54 Ok(serde_json::from_str(&s)?)
58 } 55 }
59 56
95 config: config, 92 config: config,
96 notify: notify, 93 notify: notify,
97 } 94 }
98 } 95 }
99 96
100 async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { 97 async fn wait_updates(uri: &str, etag: &str) -> Result<(Vec<u8>, u16)> {
101 debug!("wait_updates {}", uri); 98 debug!("wait_updates {}", uri);
102 let req = hyper::Request::get(uri) 99 // XXX Workaround for https://github.com/dtolnay/anyhow/issues/35
103 .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) 100 let mut resp = match surf::get(uri)
104 .body(hyper::Body::from("")).unwrap(); 101 .set_header("etag", etag).await {
102 Ok(r) => r,
103 Err(e) => bail!(e),
104 };
105 105
106 // TODO timeout? 106 // TODO timeout?
107 let resp = hyper::Client::new().request(req).await?;
108 let status = resp.status(); 107 let status = resp.status();
109 let chunk = resp.into_body().try_concat().await?; 108 let bytes = resp.body_bytes().await?;
110 109
111 Ok((chunk, status)) 110 Ok((bytes, status.into()))
112 } 111 }
113 112
114 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { 113 fn handle_response(&self, contents : &Vec<u8>, status: u16) -> Result<Params> {
115 114
116 #[derive(Deserialize, Debug)] 115 #[derive(Deserialize, Debug)]
117 struct Response { 116 struct Response {
118 // sent as an opaque etag: header. Has format "epoch-nonce", 117 // sent as an opaque etag: header. Has format "epoch-nonce",
119 // responses where the epoch do not match ParamWaiter::epoch are dropped 118 // responses where the epoch do not match ParamWaiter::epoch are dropped
120 etag: String, 119 etag: String,
121 params: Params, 120 params: Params,
122 } 121 }
123 122
124 match status { 123 match status {
125 hyper::StatusCode::OK => { 124 200 => {
125 // 200 OK
126 // new params 126 // new params
127 let r: Response = serde_json::from_slice(&buf)?; 127 let r: Response = serde_json::from_slice(contents)?;
128 let mut le = self.last_etag.borrow_mut(); 128 let mut le = self.last_etag.borrow_mut();
129 *le = r.etag; 129 *le = r.etag;
130 130
131 // update params if the epoch is correct 131 // update params if the epoch is correct
132 if let Some(e) = le.split('-').next() { 132 if let Some(e) = le.split('-').next() {
133 if e == &self.epoch { 133 if e == &self.epoch {
134 self.write_params(&r.params); 134 self.write_params(&r.params);
135 return Ok(r.params); 135 return Ok(r.params);
136 } 136 }
137 } 137 }
138 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) 138 bail!("Bad epoch from server '{}' expected '{}'", *le, self.epoch);
139 } 139 }
140 hyper::StatusCode::NOT_MODIFIED => { 140 304 => {
141 // 304 Not Modified
141 // XXX this isn't really an error. Should handle_response() return 142 // XXX this isn't really an error. Should handle_response() return
142 // Result<Option<Params>, TemplogError> instead? 143 // Result<Option<Params>, TemplogError> instead?
143 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) 144 bail!("304 unmodified (long polling timeout at the server)");
144 }, 145 },
145 _ => { 146 _ => {
146 let text = String::from_utf8_lossy(buf.as_ref()); 147 let text = String::from_utf8_lossy(contents);
147 Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text))) 148 bail!("Wrong server response code {}: {}", status, text);
148 }, 149 },
149 } 150 }
150 } 151 }
151 152
152 fn write_params(&self, params: &Params) { 153 fn write_params(&self, params: &Params) {
158 // XXX notify? 159 // XXX notify?
159 error!("Couldn't write to {}: {}", self.config.params_file, e) 160 error!("Couldn't write to {}: {}", self.config.params_file, e)
160 }; 161 };
161 } 162 }
162 163
163 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { 164 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<()> {
164 let url = self.config.settings_url.clone(); 165 let url = self.config.settings_url.clone();
165 let etag = self.last_etag.borrow().clone(); 166 let etag = self.last_etag.borrow().clone();
166 let h = ctx.run(async move { 167 let h = ctx.run(async move {
167 Self::wait_updates(&url, &etag).await 168 Self::wait_updates(&url, &etag).await
168 }).expect("spawn failed"); // XXX error handling 169 }).expect("spawn failed"); // XXX error handling
169 let (chunk, stat) = block_on(h)?; 170 let (contents, stat) = task::block_on(h)?;
170 let new_params = self.handle_response(chunk, stat)?; 171 let new_params = self.handle_response(&contents, stat)?;
171 self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None); 172 self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None);
172 Ok(()) 173 Ok(())
173 } 174 }
174 } 175 }
175 176