Mercurial > templog
comparison rust/src/params.rs @ 638:a9f353f488d0 rust
fix channels
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Sat, 09 Nov 2019 11:35:59 +0800 |
parents | 43eb3cfdf769 |
children | 89818a14648b |
comparison
equal
deleted
inserted
replaced
637:1e147b3c2c55 | 638:a9f353f488d0 |
---|---|
67 pub struct ParamWaiter { | 67 pub struct ParamWaiter { |
68 limitlog: NotTooOften, | 68 limitlog: NotTooOften, |
69 // last_etag is used for long-polling. | 69 // last_etag is used for long-polling. |
70 last_etag: RefCell<String>, | 70 last_etag: RefCell<String>, |
71 epoch: String, | 71 epoch: String, |
72 chan: Option<ChannelRef<Params>>, // TODO: a way to avoid Option? | 72 notify: ChannelRef<Params>, |
73 | 73 |
74 config: Config, | 74 config: Config, |
75 } | 75 } |
76 | 76 |
77 const LOG_MINUTES: u64 = 15; | 77 const LOG_MINUTES: u64 = 15; |
78 const MAX_RESPONSE_SIZE: usize = 10000; | 78 const MAX_RESPONSE_SIZE: usize = 10000; |
79 const TIMEOUT_MINUTES: u64 = 5; | 79 const TIMEOUT_MINUTES: u64 = 5; |
80 | 80 |
81 impl ParamWaiter { | 81 impl ParamWaiter { |
82 | 82 pub fn new_actor((config, notify): (Config, ChannelRef<Params>)) -> Self { |
83 pub fn new(config: Config) -> Self { | 83 Self::new(config, notify) |
84 } | |
85 | |
86 pub fn new(config: Config, notify: ChannelRef<Params>) -> Self { | |
84 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 | 87 let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64 |
85 OsRng.fill_bytes(&mut b); | 88 OsRng.fill_bytes(&mut b); |
86 let epoch = base64::encode(&b); | 89 let epoch = base64::encode(&b); |
87 | 90 |
88 ParamWaiter { | 91 ParamWaiter { |
89 limitlog: NotTooOften::new(LOG_MINUTES*60), | 92 limitlog: NotTooOften::new(LOG_MINUTES*60), |
90 last_etag: RefCell::new(String::new()), | 93 last_etag: RefCell::new(String::new()), |
91 epoch: epoch, | 94 epoch: epoch, |
92 chan: None, | |
93 config: config, | 95 config: config, |
96 notify: notify, | |
94 } | 97 } |
95 } | 98 } |
96 | 99 |
97 async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { | 100 async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> { |
101 debug!("wait_updates {}", uri); | |
98 let req = hyper::Request::get(uri) | 102 let req = hyper::Request::get(uri) |
99 .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) | 103 .header(hyper::header::ETAG, etag)//*self.last_etag.borrow()) |
100 .body(hyper::Body::from("")).unwrap(); | 104 .body(hyper::Body::from("")).unwrap(); |
101 | 105 |
102 // TODO timeout? | 106 // TODO timeout? |
145 } | 149 } |
146 } | 150 } |
147 | 151 |
148 fn write_params(&self, params: &Params) { | 152 fn write_params(&self, params: &Params) { |
149 let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite); | 153 let p = atomicwrites::AtomicFile::new(&self.config.params_file, atomicwrites::AllowOverwrite); |
150 p.write(|f| { | 154 let r = p.write(|f| { |
151 serde_json::to_writer(f, params) | 155 serde_json::to_writer(f, params) |
152 }); | 156 }); |
157 if let Err(e) = r { | |
158 // XXX notify? | |
159 error!("Couldn't write to {}: {}", self.config.params_file, e) | |
160 }; | |
153 } | 161 } |
154 | 162 |
155 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { | 163 fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> { |
156 let url = self.config.settings_url.clone(); | 164 let url = self.config.settings_url.clone(); |
157 let etag = self.last_etag.borrow().clone(); | 165 let etag = self.last_etag.borrow().clone(); |
158 let h = ctx.run(async move { | 166 let h = ctx.run(async move { |
159 Self::wait_updates(&url, &etag).await | 167 Self::wait_updates(&url, &etag).await |
160 }).expect("spawn failed"); // XXX error handling | 168 }).expect("spawn failed"); // XXX error handling |
161 let (chunk, stat) = block_on(h)?; | 169 let (chunk, stat) = block_on(h)?; |
162 let new_params = self.handle_response(chunk, stat)?; | 170 let new_params = self.handle_response(chunk, stat)?; |
163 self.chan.as_ref().unwrap().tell(Publish{msg: new_params, topic: "params".into()}, None); | 171 self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None); |
164 Ok(()) | 172 Ok(()) |
165 } | 173 } |
166 } | 174 } |
167 | 175 |
168 #[derive(Clone,Debug)] | 176 #[derive(Clone,Debug)] |
184 }); | 192 }); |
185 } | 193 } |
186 } | 194 } |
187 | 195 |
188 fn pre_start(&mut self, ctx: &Context<Self::Msg>) { | 196 fn pre_start(&mut self, ctx: &Context<Self::Msg>) { |
189 self.chan = Some(channel("params", &ctx.system).unwrap()); | |
190 ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); | 197 ctx.schedule_once(Duration::from_secs(1), ctx.myself(), None, PollForParams); |
191 } | 198 } |
192 } | 199 } |
193 | 200 |
194 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { | 201 // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> { |