comparison rust/src/paramwaiter.rs @ 611:f3e39e2107fd rust

still doesn't compile, improvements to TemplogError and tokio curl though
author Matt Johnston <matt@ucc.asn.au>
date Tue, 28 Feb 2017 22:58:47 +0800
parents 7bda01659426
children 5c2b0d47bb83
comparison
equal deleted inserted replaced
610:af0dac00d40b 611:f3e39e2107fd
1 extern crate tokio_core; 1 extern crate tokio_core;
2 extern crate futures; 2 extern crate futures;
3 extern crate rand; 3 extern crate rand;
4 extern crate serde_json;
4 5
5 use std::time::Duration; 6 use std::time::Duration;
6 use std::io; 7 use std::io;
8 use std::str;
9 use std::rc::Rc;
10 use std::sync::{Arc,Mutex};
11 use std::error::Error;
7 12
8 use tokio_core::reactor::Interval; 13 use tokio_core::reactor::Interval;
9 use tokio_core::reactor::Handle; 14 use tokio_core::reactor::Handle;
10 use tokio_curl::Session; 15 use tokio_curl::Session;
11 use futures::{Stream,Future,future}; 16 use futures::{Stream,Future,future};
17 use curl::easy::Easy;
18
12 use types::*; 19 use types::*;
13 use curl::Easy;
14 use ::Config; 20 use ::Config;
15 21
22 #[derive(Clone)]
16 pub struct ParamWaiter { 23 pub struct ParamWaiter {
17 limitlog: NotTooOften, 24 limitlog: NotTooOften,
18 epoch_tag: String, 25 epoch_tag: String,
19 session: Option<Session>, 26 session: Option<Session>,
20 config: Config, 27 config: Config,
28 handle: Handle,
21 } 29 }
22 30
23 const LOGMINUTES: u64 = 15; 31 const LOG_MINUTES: u64 = 15;
32 const MAX_RESPONSE_SIZE: usize = 10000;
24 33
25 impl ParamWaiter { 34 impl ParamWaiter {
26 pub fn new(config: &Config) -> Self { 35 pub fn new(config: &Config, handle: &Handle) -> Self {
27 ParamWaiter { 36 ParamWaiter {
28 limitlog: NotTooOften::new(LOGMINUTES*60), 37 limitlog: NotTooOften::new(LOG_MINUTES*60),
29 epoch_tag: String::new(), 38 epoch_tag: String::new(),
30 session: None, 39 session: None,
31 config: config.clone(), 40 config: config.clone(),
41 handle: handle.clone(),
32 } 42 }
33 } 43 }
34 44
35 fn make_req(&self) -> Easy { 45 fn make_req(&self) -> Easy {
36 let mut req = Easy::new(); 46 let mut req = Easy::new();
37 req.get(true).unwrap(); 47 req.get(true).unwrap();
38 req.url(config.SETTINGS_URL); 48 // supposedly req.url won't fail, checking is later?
49 req.url(&self.config.SETTINGS_URL).unwrap();
50 req
39 } 51 }
40 52
41 fn step(&mut self, handle: &Handle) -> Box<Future<Item=Option<Params>, Error=io::Error>> { 53 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
54 let text = String::from_utf8_lossy(buf).to_string();
55 let resp = req.response_code()?;
56 match resp {
57 200 => Ok(Some(serde_json::from_str(&text)?)), // new params
58 304 => Ok(None), // unmodified (long polling timeout at the server)
59 _ => {
60 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
61 },
62 }
63 }
64
65 fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
42 if self.session.is_none() { 66 if self.session.is_none() {
43 self.session = Some(Session::new(handle.clone())) 67 self.session = Some(Session::new(self.handle.clone()))
44 } 68 }
45 69
46 let req = self.make_req(); 70 let mut req = self.make_req();
47 /* 71 let buf = Arc::new(Mutex::new(Vec::new()));
48 self.session.unwrap().perform(self.make_req())
49 .and_then(||)
50 */
51 72
73 let dst = buf.clone();
74 req.write_function(move |data| {
75 let mut dst = dst.lock().unwrap();
76 dst.extend_from_slice(data);
77 if dst.len() > MAX_RESPONSE_SIZE {
78 debug!("Too large params response from server: {}", dst.len());
79 Ok(0)
80 } else {
81 Ok(data.len())
82 }
83 }).unwrap();
84
85 // XXX too many clones
86 let se = self.clone();
87 let s = self.clone().session.unwrap().perform(req)
88 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
89 .and_then(move |mut es| {
90 let result = buf.lock().unwrap();
91 se.handle_response(&mut es, &result)
92 });
93 Box::new(s)
94
95 /*
52 let mut p = Params::defaults(); 96 let mut p = Params::defaults();
53 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); 97 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
54 future::ok(p).boxed() 98 future::ok(Some(p)).boxed()
99 */
55 } 100 }
56 101
57 pub fn stream(&mut self, handle: &Handle) -> Box<Stream<Item=Params, Error=io::Error>> { 102 pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> {
58 103
59 let dur = Duration::from_millis(4000); 104 let dur = Duration::from_millis(4000);
60 let i = Interval::new(dur, handle).unwrap() 105 let mut s = Rc::new(self.clone());
106 let i = Interval::new(dur, &self.handle).unwrap()
107 .map_err(|e| TemplogError::new_io("interval failed", e))
61 .and_then(move |()| { 108 .and_then(move |()| {
62 s.step() 109 let ss = Rc::get_mut(&mut s).unwrap();
110 ss.step()
63 }) 111 })
64 // throw away None params 112 // throw away None params
65 .filter_map(|p| p); 113 .filter_map(|p| p);
66 Box::new(i) 114 Box::new(i)
67 } 115 }