comparison rust/src/params.rs @ 633:490e9e15b98c rust

move some bits to riker
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Sep 2019 23:24:13 +0800
parents bde302def78e
children a5721c02d3ee
comparison
equal deleted inserted replaced
632:bde302def78e 633:490e9e15b98c
6 use std::error::Error; 6 use std::error::Error;
7 use std::cell::{Cell,RefCell}; 7 use std::cell::{Cell,RefCell};
8 use std::fs::File; 8 use std::fs::File;
9 use std::io::Read; 9 use std::io::Read;
10 10
11 use serde::{Serialize,Deserialize};
11 12
12 use rand::Rng; 13 use rand::rngs::{StdRng, OsRng};
14 use rand::{RngCore, SeedableRng};
15
13 use std::str::FromStr; 16 use std::str::FromStr;
14 use hyper; 17 use hyper;
15 use hyper::header::{Headers, ETag, EntityTag};
16 use hyper::client::Client; 18 use hyper::client::Client;
17 19
18 use crate::types::*; 20 use riker::actors::*;
19 use ::Config; 21
22 use super::types::*;
23 use super::config::Config;
20 24
21 #[derive(Deserialize, Serialize, Debug, Clone)] 25 #[derive(Deserialize, Serialize, Debug, Clone)]
22 pub struct Params { 26 pub struct Params {
23 pub fridge_setpoint: f32, 27 pub fridge_setpoint: f32,
24 pub fridge_difference: f32, 28 pub fridge_difference: f32,
52 56
53 pub fn load(config: &Config) -> Params { 57 pub fn load(config: &Config) -> Params {
54 Self::try_load(&config.PARAMS_FILE) 58 Self::try_load(&config.PARAMS_FILE)
55 .unwrap_or_else(|_| Params::defaults()) 59 .unwrap_or_else(|_| Params::defaults())
56 } 60 }
61
57 } 62 }
58 63
59 #[derive(Deserialize, Debug)]
60 struct Response {
61 // sent as an opaque etag: header. Has format "epoch-nonce",
62 // responses where the epoch do not match ParamWaiter::epoch are dropped
63 etag: String,
64 params: Params,
65 }
66
67 #[derive(Clone)]
68 pub struct ParamWaiter { 64 pub struct ParamWaiter {
69 limitlog: NotTooOften, 65 limitlog: NotTooOften,
70 // last_etag is used for long-polling. 66 // last_etag is used for long-polling.
71 last_etag: RefCell<String>, 67 last_etag: RefCell<String>,
72 epoch: String, 68 epoch: String,
69 chan: ChannelRef<Params>,
73 70
74 config: Config, 71 config: Config,
75 handle: Handle,
76 } 72 }
77 73
78 const LOG_MINUTES: u64 = 15; 74 const LOG_MINUTES: u64 = 15;
79 const MAX_RESPONSE_SIZE: usize = 10000; 75 const MAX_RESPONSE_SIZE: usize = 10000;
80 const TIMEOUT_MINUTES: u64 = 5; 76 const TIMEOUT_MINUTES: u64 = 5;
81 77
82 impl ParamWaiter { 78 impl ParamWaiter {
83 fn make_request(&self) -> hyper::client::FutureResponse { 79
84 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config"); 80 fn new(config: Config) -> Self {
85 let mut req = hyper::client::Request::new(hyper::Method::Get, uri); 81 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
86 { 82 OsRng.fill_bytes(&mut b);
87 let headers = req.headers_mut(); 83 let epoch = base64::encode(&b);
88 headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone()))); 84
85 ParamWaiter {
86 limitlog: NotTooOften::new(LOG_MINUTES*60),
87 last_etag: RefCell::new(String::new()),
88 epoch: epoch,
89 config: config,
89 } 90 }
90 hyper::client::Client::new(&self.handle).request(req)
91 // XXX how do we do this?
92 // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
93 } 91 }
94 92
95 93
94 async fn keep_waiting(&mut self) {
95 loop {
96 self.wait_updates().await;
97 }
98 }
99
100 async fn wait_updates(&mut self) {
101
102 let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
103 let mut req = hyper::Request::new(hyper::Method::Get, uri);
104 req.headers_mut().insert(hyper::header::ETAG, self.last_etag.borrow());
105 let resp = hyper::Client::new(&self.handle).request(req).await?;
106 let b = resp.body().concat2().await?;
107 let new_params = self.handle_response(b)?;
108 self.chan.tell(Publish{msg: new_params, topic: "params".into()}, None);
109 }
110
96 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> { 111 fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> {
112 #[derive(Deserialize, Debug)]
113 struct Response {
114 // sent as an opaque etag: header. Has format "epoch-nonce",
115 // responses where the epoch do not match ParamWaiter::epoch are dropped
116 etag: String,
117 params: Params,
118 }
119
97 let text = String::from_utf8_lossy(buf.as_ref()); 120 let text = String::from_utf8_lossy(buf.as_ref());
98 match status { 121 match status {
99 hyper::StatusCode::Ok => { 122 hyper::StatusCode::Ok => {
100 // new params 123 // new params
101 let r: Response = serde_json::from_str(&text)?; 124 let r: Response = serde_json::from_str(&text)?;
127 let p = atomicwrites::AtomicFile::new(&self.config.PARAMS_FILE, atomicwrites::AllowOverwrite); 150 let p = atomicwrites::AtomicFile::new(&self.config.PARAMS_FILE, atomicwrites::AllowOverwrite);
128 p.write(|f| { 151 p.write(|f| {
129 serde_json::to_writer(f, params) 152 serde_json::to_writer(f, params)
130 }); 153 });
131 } 154 }
132
133 pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
134 let rcself = Rc::new(ParamWaiter::new(config, handle));
135
136 let dur = Duration::from_millis(4000);
137 for _ in Interval::new(dur, &rcself.handle).unwrap() {
138 // fetch params
139 // TODO - skip if inflight.
140 let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
141 let status = r.status();
142 let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
143 if let Ok(params) = rcself.handle_response(b, status) {
144 stream_yield!(params);
145 }
146 }
147 Ok(())
148 }
149
150 fn new(config: Config, handle: Handle) -> Self {
151 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
152 rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b);
153 let epoch = base64::encode(&b);
154
155 ParamWaiter {
156 limitlog: NotTooOften::new(LOG_MINUTES*60),
157 last_etag: RefCell::new(String::new()),
158 epoch: epoch,
159 config: config,
160 handle: handle,
161 }
162 }
163
164 } 155 }
165 156
157 impl Actor for ParamWaiter {
158
159 fn post_start(&mut self, ctx: &Context<Self::Msg>) {
160 self.chan = channel("params", &ctx.system).unwrap();
161 ctx.run(self.wait_updates());
162 }
163 }
164
165 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
166 // let rcself = Rc::new(ParamWaiter::new(config, handle));
167
168 // let dur = Duration::from_millis(4000);
169 // for _ in Interval::new(dur, &rcself.handle).unwrap() {
170 // // fetch params
171 // // TODO - skip if inflight.
172 // let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
173 // let status = r.status();
174 // let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
175 // if let Ok(params) = rcself.handle_response(b, status) {
176 // stream_yield!(params);
177 // }
178 // }
179 // Ok(())
180 // }
181