comparison rust/src/paramwaiter.rs @ 614:e1bab5b36352 rust

using some refcells for the paramwaiter
author Matt Johnston <matt@ucc.asn.au>
date Tue, 07 Mar 2017 23:04:02 +0800
parents 5c2b0d47bb83
children
comparison
equal deleted inserted replaced
613:5c2b0d47bb83 614:e1bab5b36352
7 use std::io; 7 use std::io;
8 use std::str; 8 use std::str;
9 use std::rc::Rc; 9 use std::rc::Rc;
10 use std::sync::{Arc,Mutex}; 10 use std::sync::{Arc,Mutex};
11 use std::error::Error; 11 use std::error::Error;
12 use std::cell::Cell; 12 use std::cell::{Cell,RefCell};
13 13
14 use tokio_core::reactor::Interval; 14 use tokio_core::reactor::Interval;
15 use tokio_core::reactor::Handle; 15 use tokio_core::reactor::Handle;
16 use tokio_curl::Session; 16 use tokio_curl::Session;
17 use futures::{Stream,Future,future}; 17 use futures::{Stream,Future,future};
18 use curl::easy::Easy; 18 use curl::easy::Easy;
19 use curl::easy;
19 20
20 use types::*; 21 use types::*;
21 use ::Config; 22 use ::Config;
22 23
23 #[derive(Deserialize, Debug)] 24 #[derive(Deserialize, Debug)]
27 } 28 }
28 29
29 #[derive(Clone)] 30 #[derive(Clone)]
30 pub struct ParamWaiter { 31 pub struct ParamWaiter {
31 limitlog: NotTooOften, 32 limitlog: NotTooOften,
32 epoch_tag: String, 33 epoch_tag: RefCell<String>,
33 session: Option<Session>, 34 session: RefCell<Option<Session>>,
35
34 config: Config, 36 config: Config,
35 handle: Handle, 37 handle: Handle,
36 } 38 }
37 39
38 const LOG_MINUTES: u64 = 15; 40 const LOG_MINUTES: u64 = 15;
39 const MAX_RESPONSE_SIZE: usize = 10000; 41 const MAX_RESPONSE_SIZE: usize = 10000;
42 const TIMEOUT_MINUTES: u64 = 5;
40 43
41 impl ParamWaiter { 44 impl ParamWaiter {
42 pub fn new(config: &Config, handle: &Handle) -> Self { 45 fn make_request(&self) -> Easy {
43 ParamWaiter {
44 limitlog: NotTooOften::new(LOG_MINUTES*60),
45 epoch_tag: String::new(),
46 session: None,
47 config: config.clone(),
48 handle: handle.clone(),
49 }
50 }
51
52 fn make_req(&self) -> Easy {
53 let mut req = Easy::new(); 46 let mut req = Easy::new();
54 req.get(true).unwrap(); 47 req.get(true).unwrap();
55 // supposedly req.url won't fail, checking is later? 48 // supposedly req.url won't fail, checking is later?
56 req.url(&self.config.SETTINGS_URL).unwrap(); 49 req.url(&self.config.SETTINGS_URL).unwrap();
50
51 req.timeout(Duration::new(TIMEOUT_MINUTES * 60, 0)).unwrap();
52
53 // http header
54 // etag: epoch-tag
55 let e = self.epoch_tag.borrow();
56 if !e.is_empty() {
57 let mut list = easy::List::new();
58 let hd = format!("etag: {}", *e);
59 list.append(&hd).unwrap();
60 req.http_headers(list).unwrap();
61 }
62
57 req 63 req
58 } 64 }
59 65
60 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> { 66 fn handle_response(&self, req: &mut Easy, buf: &Vec<u8>) -> Result<Option<Params>, TemplogError> {
61 let text = String::from_utf8_lossy(buf).to_string(); 67 let text = String::from_utf8_lossy(buf).to_string();
62 let resp = req.response_code()?; 68 let resp = req.response_code()?;
63 match resp { 69 match resp {
64 200 => { 70 200 => {
65 // new params 71 // new params
66 let r: Response = serde_json::from_str(&text)?; 72 let r: Response = serde_json::from_str(&text)?;
67 // XXX 73 *self.epoch_tag.borrow_mut() = r.epoch_tag;
68 //self.epoch_tag = r.epoch_tag;
69 Ok(Some(r.params)) 74 Ok(Some(r.params))
70 } 75 }
71 304 => Ok(None), // unmodified (long polling timeout at the server) 76 304 => Ok(None), // unmodified (long polling timeout at the server)
72 _ => { 77 _ => {
73 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text))) 78 Err(TemplogError::new(&format!("Wrong server response code {}: {}", resp, text)))
74 }, 79 },
75 } 80 }
76 } 81 }
77 82
78 fn step(&mut self) -> Box<Future<Item=Option<Params>, Error=TemplogError>> { 83 fn step(rself: Rc<Self>) -> Box<Future<Item=Option<Params>, Error=TemplogError>> {
79 if self.session.is_none() { 84
80 self.session = Some(Session::new(self.handle.clone())) 85 if rself.session.borrow().is_none() {
86 *rself.session.borrow_mut() = Some(Session::new(rself.handle.clone()));
81 } 87 }
88 let ses = rself.session.borrow().clone().unwrap();
82 89
83 let mut req = self.make_req(); 90 let mut req = rself.make_request();
84 let buf = Arc::new(Mutex::new(Vec::new())); 91 let buf = Arc::new(Mutex::new(Vec::new()));
85 92
86 let dst = buf.clone(); 93 let dst = buf.clone();
87 req.write_function(move |data| { 94 req.write_function(move |data| {
88 let mut dst = dst.lock().unwrap(); 95 let mut dst = dst.lock().unwrap();
93 } else { 100 } else {
94 Ok(data.len()) 101 Ok(data.len())
95 } 102 }
96 }).unwrap(); 103 }).unwrap();
97 104
98 // XXX too many clones 105 let s = ses.perform(req)
99 let se = self.clone();
100 let s = self.clone().session.unwrap().perform(req)
101 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error())) 106 .map_err(|e| TemplogError::new_io("tokio curl error", e.into_error()))
102 .and_then(move |mut es| { 107 .and_then(move |mut rq| {
103 let result = buf.lock().unwrap(); 108 let result = buf.lock().unwrap();
104 se.handle_response(&mut es, &result) 109 rself.handle_response(&mut rq, &result)
105 }); 110 });
106 Box::new(s) 111 Box::new(s)
107 112
108 /* 113 /*
109 let mut p = Params::defaults(); 114 let mut p = Params::defaults();
110 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>(); 115 p.fridge_setpoint = 17.0 + 4.0*rand::random::<f32>();
111 future::ok(Some(p)).boxed() 116 future::ok(Some(p)).boxed()
112 */ 117 */
113 } 118 }
114 119
115 pub fn stream(&mut self) -> Box<Stream<Item=Params, Error=TemplogError>> { 120 pub fn new(config: &Config, handle: &Handle) -> Self {
121 ParamWaiter {
122 limitlog: NotTooOften::new(LOG_MINUTES*60),
123 epoch_tag: RefCell::new(String::new()),
124 session: RefCell::new(None),
125 config: config.clone(),
126 handle: handle.clone(),
127 }
128 }
129
130
131 pub fn stream(config: &Config, handle: &Handle) -> Box<Stream<Item=Params, Error=TemplogError>> {
132 let rcself = Rc::new(ParamWaiter::new(config, handle));
116 133
117 let dur = Duration::from_millis(4000); 134 let dur = Duration::from_millis(4000);
118 let mut s = Rc::new(self.clone()); 135 let i = Interval::new(dur, &rcself.handle).unwrap()
119 let i = Interval::new(dur, &self.handle).unwrap()
120 .map_err(|e| TemplogError::new_io("interval failed", e)) 136 .map_err(|e| TemplogError::new_io("interval failed", e))
121 .and_then(move |()| { 137 .and_then(move |()| {
122 let ss = Rc::get_mut(&mut s).unwrap(); 138 Self::step(rcself.clone())
123 ss.step()
124 }) 139 })
125 // throw away None params 140 // throw away None params
126 .filter_map(|p| p); 141 .filter_map(|p| p);
127 Box::new(i) 142 Box::new(i)
128 } 143 }