comparison rust/src/params.rs @ 638:a9f353f488d0 rust

fix channels
author Matt Johnston <matt@ucc.asn.au>
date Sat, 09 Nov 2019 11:35:59 +0800
parents 43eb3cfdf769
children 89818a14648b
comparison
equal deleted inserted replaced
637:1e147b3c2c55 638:a9f353f488d0
67 pub struct ParamWaiter { 67 pub struct ParamWaiter {
68 limitlog: NotTooOften, 68 limitlog: NotTooOften,
69 // last_etag is used for long-polling. 69 // last_etag is used for long-polling.
70 last_etag: RefCell<String>, 70 last_etag: RefCell<String>,
71 epoch: String, 71 epoch: String,
72 chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option? 72 notify: ChannelRef<Params>,
73 73
74 config: Config, 74 config: Config,
75 } 75 }
76 76
77 const LOG_MINUTES: u64 = 15; 77 const LOG_MINUTES: u64 = 15;
78 const MAX_RESPONSE_SIZE: usize = 10000; 78 const MAX_RESPONSE_SIZE: usize = 10000;
79 const TIMEOUT_MINUTES: u64 = 5; 79 const TIMEOUT_MINUTES: u64 = 5;
80 80
81 impl ParamWaiter { 81 impl ParamWaiter {
82 82 pub fn new_actor((config, notify): (Config, ChannelRef<Params>)) -> Self {
83 pub fn new(config: Config) -> Self { 83 Self::new(config, notify)
84 }
85
86 pub fn new(config: Config, notify: ChannelRef<Params>) -> Self {
84 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 87 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
85 OsRng.fill_bytes(&mut b); 88 OsRng.fill_bytes(&mut b);
86 let epoch = base64::encode(&b); 89 let epoch = base64::encode(&b);
87 90
88 ParamWaiter { 91 ParamWaiter {
89 limitlog: NotTooOften::new(LOG_MINUTES*60), 92 limitlog: NotTooOften::new(LOG_MINUTES*60),
90 last_etag: RefCell::new(String::new()), 93 last_etag: RefCell::new(String::new()),
91 epoch: epoch, 94 epoch: epoch,
92 chan: None,
93 config: config, 95 config: config,
96 notify: notify,
94 } 97 }
95 } 98 }
96 99
97 async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { 100 async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> {
101 debug!("wait_updates {}", uri);
98 let req = hyper::Request::get(uri) 102 let req = hyper::Request::get(uri)
99 .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) 103 .header(hyper::header::ETAG, etag)//*self.last_etag.borrow())
100 .body(hyper::Body::from("")).unwrap(); 104 .body(hyper::Body::from("")).unwrap();
101 105
102 // TODO timeout? 106 // TODO timeout?
145 } 149 }
146 } 150 }
147 151
148 fn write_params(&self, params: &Params) { 152 fn write_params(&self, params: &Params) {
149 let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite); 153 let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite);
150 p.write(|f| { 154 let r = p.write(|f| {
151 serde_json::to_writer(f, params) 155 serde_json::to_writer(f, params)
152 }); 156 });
157 if let Err(e) = r {
158 // XXX notify?
159 error!("Couldn't write to {}: {}", self.config.params_file, e)
160 };
153 } 161 }
154 162
155 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { 163 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> {
156 let url = self.config.settings_url.clone(); 164 let url = self.config.settings_url.clone();
157 let etag = self.last_etag.borrow().clone(); 165 let etag = self.last_etag.borrow().clone();
158 let h = ctx.run(async move { 166 let h = ctx.run(async move {
159 Self::wait_updates(&url, &etag).await 167 Self::wait_updates(&url, &etag).await
160 }).expect("spawn failed"); // XXX error handling 168 }).expect("spawn failed"); // XXX error handling
161 let (chunk, stat) = block_on(h)?; 169 let (chunk, stat) = block_on(h)?;
162 let new_params = self.handle_response(chunk, stat)?; 170 let new_params = self.handle_response(chunk, stat)?;
163 self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None); 171 self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None);
164 Ok(()) 172 Ok(())
165 } 173 }
166 } 174 }
167 175
168 #[derive(Clone,Debug)] 176 #[derive(Clone,Debug)]
184 }); 192 });
185 } 193 }
186 } 194 }
187 195
188 fn pre_start(&mut self, ctx: &Context<Self::Msg>) { 196 fn pre_start(&mut self, ctx: &Context<Self::Msg>) {
189 self.chan = Some(channel("params", &ctx.system).unwrap());
190 ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); 197 ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams);
191 } 198 }
192 } 199 }
193 200
194 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { 201 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {