diff rust/src/params.rs @ 633:490e9e15b98c rust

move some bits to riker
author Matt Johnston <matt@ucc.asn.au>
date Wed, 04 Sep 2019 23:24:13 +0800
parents bde302def78e
children a5721c02d3ee
line wrap: on
line diff
--- a/rust/src/params.rs	Thu Aug 22 23:59:50 2019 +0800
+++ b/rust/src/params.rs	Wed Sep 04 23:24:13 2019 +0800
@@ -8,15 +8,19 @@
 use std::fs::File;
 use std::io::Read;
 
+use serde::{Serialize,Deserialize};
 
-use rand::Rng;
+use rand::rngs::{StdRng, OsRng};
+use rand::{RngCore, SeedableRng};
+
 use std::str::FromStr;
 use hyper;
-use hyper::header::{Headers, ETag, EntityTag};
 use hyper::client::Client;
 
-use crate::types::*;
-use ::Config;
+use riker::actors::*;
+
+use super::types::*;
+use super::config::Config;
 
 #[derive(Deserialize, Serialize, Debug, Clone)]
 pub struct Params {
@@ -54,25 +58,17 @@
         Self::try_load(&config.PARAMS_FILE)
             .unwrap_or_else(|_| Params::defaults())
     }
+
 }
 
-#[derive(Deserialize, Debug)]
-struct Response {
-    // sent as an opaque etag: header. Has format "epoch-nonce",
-    // responses where the epoch do not match ParamWaiter::epoch are dropped
-    etag: String,
-    params: Params,
-}
-
-#[derive(Clone)]
 pub struct ParamWaiter {
     limitlog: NotTooOften,
     // last_etag is used for long-polling.
     last_etag: RefCell<String>,
     epoch: String,
+    chan: ChannelRef<Params>,
 
     config: Config,
-    handle: Handle,
 }
 
 const LOG_MINUTES: u64 = 15;
@@ -80,20 +76,47 @@
 const TIMEOUT_MINUTES: u64 = 5;
 
 impl ParamWaiter {
-    fn make_request(&self) -> hyper::client::FutureResponse {
-        let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
-        let mut req = hyper::client::Request::new(hyper::Method::Get, uri);
-        {
-            let headers = req.headers_mut();
-            headers.set(hyper::header::ETag(EntityTag::new(false, self.last_etag.borrow().clone())));
+
+    fn new(config: Config) -> Self {
+        let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
+        OsRng.fill_bytes(&mut b);
+        let epoch = base64::encode(&b);
+
+        ParamWaiter {
+            limitlog: NotTooOften::new(LOG_MINUTES*60),
+            last_etag: RefCell::new(String::new()),
+            epoch: epoch,
+            config: config,
         }
-        hyper::client::Client::new(&self.handle).request(req)
-        // XXX how do we do this?
-        // req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
     }
 
 
+    async fn keep_waiting(&mut self) {
+        loop {
+            self.wait_updates().await;
+        }
+    }
+
+    async fn wait_updates(&mut self) {
+
+        let uri = self.config.SETTINGS_URL.parse().expect("Bad SETTINGS_URL in config");
+        let mut req = hyper::Request::new(hyper::Method::Get, uri);
+        req.headers_mut().insert(hyper::header::ETAG, self.last_etag.borrow());
+        let resp = hyper::Client::new(&self.handle).request(req).await?;
+        let b = resp.body().concat2().await?;
+        let new_params = self.handle_response(b)?;
+        self.chan.tell(Publish{msg: new_params, topic: "params".into()}, None);
+    }
+
     fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> {
+        #[derive(Deserialize, Debug)]
+        struct Response {
+            // sent as an opaque etag: header. Has format "epoch-nonce",
+            // responses where the epoch do not match ParamWaiter::epoch are dropped
+            etag: String,
+            params: Params,
+        }
+
         let text = String::from_utf8_lossy(buf.as_ref());
         match status {
             hyper::StatusCode::Ok => {
@@ -129,37 +152,30 @@
             serde_json::to_writer(f, params)
         });
     }
+}
 
-    pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
-        let rcself = Rc::new(ParamWaiter::new(config, handle));
+impl Actor for ParamWaiter {
 
-        let dur = Duration::from_millis(4000);
-        for _ in Interval::new(dur, &rcself.handle).unwrap() {
-            // fetch params
-            // TODO - skip if inflight.
-            let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
-            let status = r.status();
-            let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
-            if let Ok(params) = rcself.handle_response(b, status) {
-                stream_yield!(params);
-            }
-        }
-        Ok(())
+    fn post_start(&mut self, ctx: &Context<Self::Msg>) {
+        self.chan = channel("params", &ctx.system).unwrap();
+        ctx.run(self.wait_updates());
     }
-
-    fn new(config: Config, handle: Handle) -> Self {
-        let mut b = [0u8; 15]; // 15 bytes -> 20 characters base64
-        rand::OsRng::new().expect("Opening system RNG failed").fill_bytes(&mut b);
-        let epoch = base64::encode(&b);
-
-        ParamWaiter {
-            limitlog: NotTooOften::new(LOG_MINUTES*60),
-            last_etag: RefCell::new(String::new()),
-            epoch: epoch,
-            config: config,
-            handle: handle,
-        }
-    }
-
 }
 
+    // pub fn stream(config: Config, handle: Handle) -> Result<(), TemplogError> {
+    //     let rcself = Rc::new(ParamWaiter::new(config, handle));
+
+    //     let dur = Duration::from_millis(4000);
+    //     for _ in Interval::new(dur, &rcself.handle).unwrap() {
+    //         // fetch params
+    //         // TODO - skip if inflight.
+    //         let r = await!(rcself.make_request()).map_err(|e| TemplogError::new_hyper("response", e))?;
+    //         let status = r.status();
+    //         let b = await!(r.body().concat2()).map_err(|e| TemplogError::new_hyper("body", e))?;
+    //         if let Ok(params) = rcself.handle_response(b, status) {
+    //             stream_yield!(params);
+    //         }
+    //     }
+    //     Ok(())
+    // }
+