Mercurial > templog
comparison rust/src/params.rs @ 634:a5721c02d3ee rust
build succeeds
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Sun, 22 Sep 2019 20:35:40 +0800 |
parents | 490e9e15b98c |
children | 4424a8b30f9c |
comparison
equal
deleted
inserted
replaced
633:490e9e15b98c | 634:a5721c02d3ee |
---|---|
1 use std::time::Duration; | 1 use std::time::Duration; |
2 use std::io; | 2 |
3 use std::str; | 3 use std::str; |
4 use std::rc::Rc; | 4 |
5 use std::sync::{Arc,Mutex}; | 5 |
6 use std::error::Error; | 6 |
7 use std::cell::{Cell,RefCell}; | 7 use std::cell::{RefCell}; |
8 use std::fs::File; | 8 use std::fs::File; |
9 use std::io::Read; | 9 use std::io::Read; |
10 | 10 |
11 use serde::{Serialize,Deserialize}; | 11 use serde::{Serialize,Deserialize}; |
12 | 12 |
13 use rand::rngs::{StdRng, OsRng}; | 13 use rand::rngs::{OsRng}; |
14 use rand::{RngCore, SeedableRng}; | 14 use rand::{RngCore}; |
15 | 15 |
16 use std::str::FromStr; | 16 |
17 use hyper; | 17 use hyper; |
18 use hyper::client::Client; | 18 |
19 // for try_concat() | |
20 use futures::stream::TryStreamExt; | |
21 use futures::executor::block_on; | |
22 // for block_on().or_else | |
23 use futures_util::try_future::TryFutureExt; | |
19 | 24 |
20 use riker::actors::*; | 25 use riker::actors::*; |
21 | 26 |
22 use super::types::*; | 27 use super::types::*; |
23 use super::config::Config; | 28 use super::config::Config; |
39 Params { | 44 Params { |
40 fridge_setpoint: 16.0, | 45 fridge_setpoint: 16.0, |
41 fridge_difference: 0.2, | 46 fridge_difference: 0.2, |
42 overshoot_delay: 720, // 12 minutes | 47 overshoot_delay: 720, // 12 minutes |
43 overshoot_factor: 1.0, | 48 overshoot_factor: 1.0, |
44 disabled: false, | 49 disabled: true, |
45 nowort: false, | 50 nowort: false, |
46 fridge_range_lower: 3.0, | 51 fridge_range_lower: 3.0, |
47 fridge_range_upper: 3.0, | 52 fridge_range_upper: 3.0, |
48 } | 53 } |
49 } | 54 } |
64 pub struct ParamWaiter { | 69 pub struct ParamWaiter { |
65 limitlog: NotTooOften, | 70 limitlog: NotTooOften, |
66 // last_etag is used for long-polling. | 71 // last_etag is used for long-polling. |
67 last_etag: RefCell<String>, | 72 last_etag: RefCell<String>, |
68 epoch: String, | 73 epoch: String, |
69 chan: ChannelRef<Params>, | 74 chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option? |
70 | 75 |
71 config: Config, | 76 config: Config, |
72 } | 77 } |
73 | 78 |
74 const LOG_MINUTES: u64 = 15; | 79 const LOG_MINUTES: u64 = 15; |
75 const MAX_RESPONSE_SIZE: usize = 10000; | 80 const MAX_RESPONSE_SIZE: usize = 10000; |
76 const TIMEOUT_MINUTES: u64 = 5; | 81 const TIMEOUT_MINUTES: u64 = 5; |
77 | 82 |
78 impl ParamWaiter { | 83 impl ParamWaiter { |
79 | 84 |
80 fn new(config: Config) -> Self { | 85 pub fn new(config: Config) -> Self { |
81 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 | 86 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 |
82 OsRng.fill_bytes(&mut b); | 87 OsRng.fill_bytes(&mut b); |
83 let epoch = base64::encode(&b); | 88 let epoch = base64::encode(&b); |
84 | 89 |
85 ParamWaiter { | 90 ParamWaiter { |
86 limitlog: NotTooOften::new(LOG_MINUTES*60), | 91 limitlog: NotTooOften::new(LOG_MINUTES*60), |
87 last_etag: RefCell::new(String::new()), | 92 last_etag: RefCell::new(String::new()), |
88 epoch: epoch, | 93 epoch: epoch, |
94 chan: None, | |
89 config: config, | 95 config: config, |
90 } | 96 } |
91 } | 97 } |
92 | 98 |
93 | 99 async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { |
94 async fn keep_waiting(&mut self) { | 100 let req = hyper::Request::get(uri) |
95 loop { | 101 .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) |
96 self.wait_updates().await; | 102 .body(hyper::Body::from("")).unwrap(); |
97 } | 103 |
98 } | 104 // TODO timeout? |
99 | 105 let resp = hyper::Client::new().request(req).await?; |
100 async fn wait_updates(&mut self) { | 106 let status = resp.status(); |
101 | 107 let chunk = resp.into_body().try_concat().await?; |
102 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); | 108 |
103 let mut req = hyper::Request::new(hyper::Method::Get, uri); | 109 Ok((chunk, status)) |
104 req.headers_mut().insert(hyper::header::ETAG, self.last_etag.borrow()); | |
105 let resp = hyper::Client::new(&self.handle).request(req).await?; | |
106 let b = resp.body().concat2().await?; | |
107 let new_params = self.handle_response(b)?; | |
108 self.chan.tell(Publish{msg: new_params, topic: "params".into()}, None); | |
109 } | 110 } |
110 | 111 |
111 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { | 112 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { |
113 | |
112 #[derive(Deserialize, Debug)] | 114 #[derive(Deserialize, Debug)] |
113 struct Response { | 115 struct Response { |
114 // sent as an opaque etag: header. Has format "epoch-nonce", | 116 // sent as an opaque etag: header. Has format "epoch-nonce", |
115 // responses where the epoch do not match ParamWaiter::epoch are dropped | 117 // responses where the epoch do not match ParamWaiter::epoch are dropped |
116 etag: String, | 118 etag: String, |
117 params: Params, | 119 params: Params, |
118 } | 120 } |
119 | 121 |
120 let text = String::from_utf8_lossy(buf.as_ref()); | |
121 match status { | 122 match status { |
122 hyper::StatusCode::Ok => { | 123 hyper::StatusCode::OK => { |
123 // new params | 124 // new params |
124 let r: Response = serde_json::from_str(&text)?; | 125 let r: Response = serde_json::from_slice(&buf)?; |
125 let mut le = self.last_etag.borrow_mut(); | 126 let mut le = self.last_etag.borrow_mut(); |
126 *le = r.etag; | 127 *le = r.etag; |
127 | 128 |
128 // update params if the epoch is correct | 129 // update params if the epoch is correct |
129 if let Some(e) = le.split('-').next() { | 130 if let Some(e) = le.split('-').next() { |
132 return Ok(r.params); | 133 return Ok(r.params); |
133 } | 134 } |
134 } | 135 } |
135 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) | 136 Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch))) |
136 } | 137 } |
137 hyper::StatusCode::NotModified => { | 138 hyper::StatusCode::NOT_MODIFIED => { |
138 // XXX this isn't really an error. Should handle_response() return | 139 // XXX this isn't really an error. Should handle_response() return |
139 // Result<Option<Params>, TemplogError> instead? | 140 // Result<Option<Params>, TemplogError> instead? |
140 | |
141 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) | 141 Err(TemplogError::new("304 unmodified (long polling timeout at the server)")) |
142 }, | 142 }, |
143 _ => { | 143 _ => { |
144 let text = String::from_utf8_lossy(buf.as_ref()); | |
144 Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text))) | 145 Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text))) |
145 }, | 146 }, |
146 } | 147 } |
147 } | 148 } |
148 | 149 |
150 let p = atomicwrites::AtomicFile::new(&self.config.PARAMS_FILE, atomicwrites::AllowOverwrite); | 151 let p = atomicwrites::AtomicFile::new(&self.config.PARAMS_FILE, atomicwrites::AllowOverwrite); |
151 p.write(|f| { | 152 p.write(|f| { |
152 serde_json::to_writer(f, params) | 153 serde_json::to_writer(f, params) |
153 }); | 154 }); |
154 } | 155 } |
155 } | 156 |
157 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { | |
158 let url = self.config.SETTINGS_URL.clone(); | |
159 let etag = self.last_etag.borrow().clone(); | |
160 let h = ctx.run(async move { | |
161 Self::wait_updates(&url, &etag).await | |
162 }).expect("spawn failed"); // XXX error handling | |
163 let (chunk, stat) = block_on(h)?; | |
164 let new_params = self.handle_response(chunk, stat)?; | |
165 self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None); | |
166 Ok(()) | |
167 } | |
168 } | |
169 | |
170 #[derive(Clone,Debug)] | |
171 pub struct PollForParams; | |
156 | 172 |
157 impl Actor for ParamWaiter { | 173 impl Actor for ParamWaiter { |
158 | 174 type Msg = PollForParams; |
159 fn post_start(&mut self, ctx: &Context<Self::Msg>) { | 175 |
160 self.chan = channel("params", &ctx.system).unwrap(); | 176 fn recv(&mut self, |
161 ctx.run(self.wait_updates()); | 177 ctx: &Context<Self::Msg>, |
178 _msg: Self::Msg, | |
179 _sender: Sender) { | |
180 // schedule a retry once this iteration finishes | |
181 ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); | |
182 | |
183 if let Err(e) = self.do_poll(ctx) { | |
184 warn!("Problem fetching params: {}", e); | |
185 } | |
186 } | |
187 | |
188 fn pre_start(&mut self, ctx: &Context<Self::Msg>) { | |
189 self.chan = Some(channel("params", &ctx.system).unwrap()); | |
190 ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); | |
162 } | 191 } |
163 } | 192 } |
164 | 193 |
165 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { | 194 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { |
166 // let rcself = Rc::new(ParamWaiter::new(config, handle)); | 195 // let rcself = Rc::new(ParamWaiter::new(config, handle)); |