diff rust/src/params.rs @ 639:89818a14648b rust tip

- switch to using anyhow for errors, surf for http runs but surf has problems
author Matt Johnston <matt@ucc.asn.au>
date Thu, 28 Nov 2019 23:57:00 +0800
parents a9f353f488d0
children
line wrap: on
line diff
--- a/rust/src/params.rs	Sat Nov 09 11:35:59 2019 +0800
+++ b/rust/src/params.rs	Thu Nov 28 23:57:00 2019 +0800
@@ -1,9 +1,9 @@
+use anyhow::{Result,bail};
+
 use std::time::Duration;
 
 use std::str;
 
-
-
 use std::cell::{RefCell};
 use std::fs::File;
 use std::io::Read;
@@ -13,12 +13,9 @@
 use rand::rngs::{OsRng};
 use rand::{RngCore};
 
-
-use hyper;
+use async_std::task;
 
-// for try_concat()
-use futures::stream::TryStreamExt;
-use futures::executor::block_on;
+use surf;
 
 use riker::actors::*;
 
@@ -51,7 +48,7 @@
             }
     }
 
-    fn try_load(filename: &str) -> Result<Params, TemplogError> {
+    fn try_load(filename: &str) -> Result<Params> {
         let mut s = String::new();
         File::open(filename)?.read_to_string(&mut s)?;
         Ok(serde_json::from_str(&s)?)
@@ -97,21 +94,23 @@
         }
     }
 
-    async fn wait_updates(uri: &str, etag: &str) -> Result<(hyper::Chunk, hyper::StatusCode), TemplogError> {
+    async fn wait_updates(uri: &str, etag: &str) -> Result<(Vec<u8>, u16)> {
         debug!("wait_updates {}", uri);
-        let req = hyper::Request::get(uri)
-            .header(hyper::header::ETAG, etag)//*self.last_etag.borrow())
-            .body(hyper::Body::from("")).unwrap();
+        // XXX Workaround for https://github.com/dtolnay/anyhow/issues/35
+        let mut resp = match surf::get(uri)
+            .set_header("etag", etag).await {
+                Ok(r) => r,
+                Err(e) => bail!(e),
+            };
 
         // TODO timeout?
-        let resp = hyper::Client::new().request(req).await?;
         let status = resp.status();
-        let chunk = resp.into_body().try_concat().await?;
+        let bytes = resp.body_bytes().await?;
 
-        Ok((chunk, status))
+        Ok((bytes, status.into()))
     }
 
-    fn handle_response(&self, buf : hyper::Chunk, status: hyper::StatusCode) -> Result<Params, TemplogError> {
+    fn handle_response(&self, contents : &Vec<u8>, status: u16) -> Result<Params> {
 
         #[derive(Deserialize, Debug)]
         struct Response {
@@ -122,9 +121,10 @@
         }
 
         match status {
-            hyper::StatusCode::OK => {
+            200 => {
+                // 200 OK
                 // new params
-                let r: Response = serde_json::from_slice(&buf)?;
+                let r: Response = serde_json::from_slice(contents)?;
                 let mut le = self.last_etag.borrow_mut();
                 *le = r.etag;
 
@@ -135,16 +135,17 @@
                         return Ok(r.params);
                     }
                 }
-                Err(TemplogError::new(&format!("Bad epoch from server '{}' expected '{}'", *le, self.epoch)))
+                bail!("Bad epoch from server '{}' expected '{}'", *le, self.epoch);
             }
-            hyper::StatusCode::NOT_MODIFIED => {
+            304 => {
+                // 304 Not Modified
                 // XXX this isn't really an error. Should handle_response() return 
                 // Result<Option<Params>, TemplogError> instead?
-                Err(TemplogError::new("304 unmodified (long polling timeout at the server)"))
+                bail!("304 unmodified (long polling timeout at the server)");
             },
             _ => {
-                let text = String::from_utf8_lossy(buf.as_ref());
-                Err(TemplogError::new(&format!("Wrong server response code {}: {}", status.as_u16(), text)))
+                let text = String::from_utf8_lossy(contents);
+                bail!("Wrong server response code {}: {}", status, text);
             },
         }
     }
@@ -160,14 +161,14 @@
         };
     }
 
-    fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<(), TemplogError> {
+    fn do_poll(&mut self, ctx: &Context<<Self as Actor>::Msg>) -> Result<()> {
         let url = self.config.settings_url.clone();
         let etag = self.last_etag.borrow().clone();
         let h = ctx.run(async move { 
             Self::wait_updates(&url, &etag).await 
         }).expect("spawn failed"); // XXX error handling
-        let (chunk, stat) = block_on(h)?;
-        let new_params = self.handle_response(chunk, stat)?;
+        let (contents, stat) = task::block_on(h)?;
+        let new_params = self.handle_response(&contents, stat)?;
         self.notify.tell(Publish{msg: new_params, topic: "params".into()}, None);
         Ok(())
     }