Mercurial > templog
changeset 253:0a1b642e3086
long polling config updates
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Mon, 08 Jun 2015 22:29:46 +0800 |
parents | b6079cb0c665 |
children | 8ef52f27cf95 |
files | py/config.py py/configwaiter.py py/params.py py/tempserver.py py/uploader.py py/utils.py web/log.py web/settings.py web/templog.py |
diffstat | 9 files changed, 198 insertions(+), 38 deletions(-) [+] |
line wrap: on
line diff
--- a/py/config.py Wed May 27 23:46:06 2015 +0800 +++ b/py/config.py Mon Jun 08 22:29:46 2015 +0800 @@ -20,8 +20,9 @@ INTERNAL_TEMPERATURE = '/sys/class/thermal/thermal_zone0/temp' HMAC_KEY = "a key" -#UPDATE_URL = 'https://matt.ucc.asn.au/test/templog/update' -UPDATE_URL = 'https://evil.ucc.asn.au/~matt/templog/update' +SERVER_URL = 'https://evil.ucc.asn.au/~matt/templog/update' +UPDATE_URL = "%s/update" % SERVER_URL +SETTINGS_URL = "%s/update" % SERVER_URL # site-local values overridden in localconfig, eg WORT_NAME, HMAC_KEY try:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/py/configwaiter.py Mon Jun 08 22:29:46 2015 +0800 @@ -0,0 +1,43 @@ +class ConfigWaiter(object): + """ Waits for config updates from the server. http long polling """ + + def __init__(self, server): + self.server = server + self.epoch_tag = None + self.http_session = aiohttp.ClientSession() + + @asyncio.coroutine + def run(self): + # wait until someting has been uploaded (the uploader itself waits 5 seconds) + yield from asyncio.sleep(10) + while True: + yield from self.do() + + # avoid spinning too fast + yield from server.sleep(1) + + @asyncio.coroutine + def do(self): + try: + if self.epoch_tag: + headers = {'etag': self.epoch_tag} + else: + headers = None + + r = yield from asyncio.wait_for( + self.http_session.get(config.SETTINGS_URL, headers=headers), + 300) + if r.status == 200: + resp = yield from asyncio.wait_for(r.json(), 300) + + self.epoch_tag = resp['epoch_tag'] + epoch = self.epoch_tag.split('-')[0] + if self.server.params.receive(resp['params'], epoch): + self.server.reload_signal(True) + + except Exception as e: + E("Error watching config: %s" % str(e)) + + + +
--- a/py/params.py Wed May 27 23:46:06 2015 +0800 +++ b/py/params.py Mon Jun 08 22:29:46 2015 +0800 @@ -3,9 +3,11 @@ import json import signal import io +import tempfile import config from utils import W,L,E,EX +import utils _FIELD_DEFAULTS = { 'fridge_setpoint': 16, @@ -24,6 +26,7 @@ def __init__(self): self.update(_FIELD_DEFAULTS) + self._epoch = None def __getattr__(self, k): return self[k] @@ -45,6 +48,7 @@ if k not in self: raise self.Error("Unknown parameter %s=%s in file '%s'" % (str(k), str(u[k]), getattr(f, 'name', '???'))) self.update(u) + self._epoch = utils.hexnonce() L("Loaded parameters") L(self.save_string()) @@ -60,19 +64,54 @@ W("Missing parameter file, using defaults. %s" % str(e)) return - def _do_save(self, f): - json.dump(self, f, sort_keys=True, indent=4) - f.write('\n') - f.flush() + def get_epoch(self): + return self._epoch + + def receive(self, params, epoch): + """ updates parameters from the server. does some validation, + writes config file to disk. + Returns True on success, False failure + """ + + if epoch != self._epoch: + return + + def same_type(a, b): + ta = type(a) + tb = type(b) + + if ta == int: + ta = float + if tb == int: + tb = float + + return ta == tb - def save(self, f = None): - if f: - return self._do_save(f) - else: - with file(config.PARAMS_FILE, 'w') as f: - return self._do_save(f) + if self.keys() != new_params.keys(): + diff = self.keys() ^ new_params.keys() + E("Mismatching params, %s" % str(diff)) + return False + + for k, v in new_params.items(): + if not same_type(v, self[k]): + E("Bad type for %s" % k) + return False + + dir = os.path.dirname(config.PARAMS_FILE) + try: + t = tempfile.NamedTemporaryFile(prefix='config', + dir = dir, + delete = False) + + t.write(json.dumps(new_params, sort_keys=True, indent=4)+'\n') + name = t.name + t.close() + + os.rename(name, config.PARAMS_FILE) + return True + except Exception as e: + E("Problem: %s" % e) + return False def save_string(self): - s = io.StringIO() - self.save(s) - return s.getvalue() + return json.dumps(self, sort_keys=True, indent=4)
--- a/py/tempserver.py Wed May 27 23:46:06 2015 +0800 +++ b/py/tempserver.py Mon Jun 08 22:29:46 2015 +0800 @@ -32,9 +32,10 @@ self.params = params.Params() self.fridge = fridge.Fridge(self) self.uploader = uploader.Uploader(self) + self.configwaiter = configwaiter.ConfigWaiter(self) self.params.load() self.set_sensors(sensor.make_sensor(self)) - asyncio.get_event_loop().add_signal_handler(signal.SIGHUP, self._reload_signal) + asyncio.get_event_loop().add_signal_handler(signal.SIGHUP, self.reload_signal) return self def __exit__(self, exc_type, exc_value, traceback): @@ -52,6 +53,7 @@ self.fridge.run(), self.sensors.run(), self.uploader.run(), + self.configwaiter.run(), ] loop = asyncio.get_event_loop() @@ -109,10 +111,11 @@ except asyncio.TimeoutError: pass - def _reload_signal(self): + def reload_signal(self, no_file = False): try: - self.params.load() - L("Reloaded.") + if not no_file: + self.params.load() + L("Reloaded.") self._wakeup.set() self._wakeup.clear() except Error as e:
--- a/py/uploader.py Wed May 27 23:46:06 2015 +0800 +++ b/py/uploader.py Mon Jun 08 22:29:46 2015 +0800 @@ -36,6 +36,7 @@ tosend['fridge_name'] = self.server.wort_name tosend['current_params'] = dict(self.server.params) + tosend['current_params_epoch'] = self.server.params.get_epoch() tosend['start_time'] = self.server.start_time tosend['uptime'] = utils.uptime() @@ -53,7 +54,7 @@ send_data = {'data': js_enc.decode(), 'hmac': mac} r = yield from asyncio.wait_for(aiohttp.request('post', config.UPDATE_URL, data=send_data), 60) result = yield from asyncio.wait_for(r.text(), 60) - if result != 'OK': + if r.status == 200 and result != 'OK': raise Exception("Server returned %s" % result) @asyncio.coroutine
--- a/py/utils.py Wed May 27 23:46:06 2015 +0800 +++ b/py/utils.py Mon Jun 08 22:29:46 2015 +0800 @@ -4,6 +4,7 @@ import time import select import logging +import binascii D = logging.debug L = logging.info @@ -133,3 +134,5 @@ except Exception as e: return -1 +def hexnonce(): + return binascii.hexlify(os.urandom(120))
--- a/web/log.py Wed May 27 23:46:06 2015 +0800 +++ b/web/log.py Mon Jun 08 22:29:46 2015 +0800 @@ -20,6 +20,9 @@ import config import atomicfile +import settings + +fridge_settings = settings.Settings() def sensor_rrd_path(s): return '%s/sensor_%s.rrd' % (config.DATA_PATH, str(s)) @@ -245,16 +248,11 @@ return val_ticks + float(val_rem) * tick_secs / tick_wake def write_current_params(current_params): - out = {} - out['params'] = current_params - out['time'] = time.time() - atomicfile.AtomicFile("%s/current_params.txt" % config.DATA_PATH).write( - json.dumps(out, sort_keys=True, indent=4)+'\n') + fridge_settings.update(current_params) def read_current_params(): - p = atomicfile.AtomicFile("%s/current_params.txt" % config.DATA_PATH).read() - dat = json.loads(p) - return dat['params'] + params, epochtag = fridge_settings.get() + return params def parse(params): @@ -278,10 +276,11 @@ # one-off measurements here current_params = params['current_params'] + current_epoch = params['current_params_epoch'] measurements['fridge_on'] = [ (time.time(), params['fridge_on']) ] measurements['fridge_setpoint'] = [ (time.time(), current_params['fridge_setpoint']) ] - write_current_params(current_params) + write_current_params(current_params, current_epoch) for s, vs in measurements.iteritems(): sensor_update(s, vs) @@ -371,10 +370,7 @@ if not same_type(v, _FIELD_DEFAULTS[k]): return "Bad type for %s, %s vs %s" % (k , type(v), type(_FIELD_DEFAULTS[k])) - ret = send_params(params) - if ret is not True: - return "Failed sending params: %s" % ret - + fridge_settings.update(params) return True
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/settings.py Mon Jun 08 22:29:46 2015 +0800 @@ -0,0 +1,61 @@ +import gevent +import fcntl +import hashlib + +class Settings(object): + RAND_SIZE = 15 # 120 bits + + """ Handles state updates from both the web UI and from the fridge client. + The fridge client is canonical. It provides the epoch (apart from 'startepoch'), that + is changed any time the fridge reloads its local config. The fridge only accepts + updates that have the same epoch. + + When the web UI changes it keeps the same epoch but generates a new tag. The fridge sends + its current known tag and waits for it to change. + + content is opaque, presently a dictionary of decoded json + """ + + def __init__(self): + self.event = gevent.event.Event() + self.contents = None + self.epoch = None + self.tag = None + + self.update(self, None, 'startepoch') + + def wait(self, epoch_tag = None, timeout = None): + """ returns false if the timeout was hit """ + if self.epoch_tag() != epoch_tag: + # has alredy changed + return True + return self.event.wait(timeout) + + def epoch_tag(self): + return '%s-%s' % (self.epoch, self.tag) + + def random(self): + return binascii.hexlify(os.urandom(self.RAND_SIZE)) + + def update(self, contents, epoch = None): + """ replaces settings contents and updates waiters if changed """ + if epoch: + if self.epoch == epoch: + return + else: + self.epoch = epoch + + self.tag = self.random() + self.contents = contents + + self.event.set() + self.event.clear() + + def get(self): + """ Returns (contents, epoch-tag) """ + return self.contents, self.epoch_tag() + + + + +
--- a/web/templog.py Wed May 27 23:46:06 2015 +0800 +++ b/web/templog.py Mon Jun 08 22:29:46 2015 +0800 @@ -28,9 +28,8 @@ def run(*args, **argm): argm['server'] = 'gevent' super(TemplogBottle, self).run(*args, **argm) - print "ran custom bottle" -#bottle.default_app.push(TemplogBottle()) +bottle.default_app.push(TemplogBottle()) secure.setup_csrf() @@ -62,7 +61,6 @@ def encode_data(data, mimetype): return 'data:%s;base64,%s' % (mimetype, binascii.b2a_base64(data).rstrip()) - @route('/graph.png') def graph(): response.set_header('Content-Type', 'image/png') @@ -76,12 +74,12 @@ csrf_blob = post_json['csrf_blob'] if not secure.check_csrf_blob(csrf_blob): - bottle.response.status = 403 + response.status = 403 return "Bad csrf" ret = log.update_params(post_json['params']) if not ret is True: - bottle.response.status = 403 + response.status = 403 return ret return "Good" @@ -159,11 +157,26 @@ #var_lookup = environ['mod_ssl.var_lookup'] #return var_lookup("SSL_SERVER_I_DN_O") +@route('/get_settings') +def get_settings(): + response.set_header('Cache-Control', 'no-cache') + req_etag = request.headers.get('etag', None) + if req_etag: + # wait for it to change + if not log.fridge_settings.wait(req_etag, timeout=LONG_POLL_TIMEOUT): + response.status = 304 + return "Nothing happened" + + response.set_header('Content-Type', 'application/json') + contents, epoch_tag = client_settings.get() + return json.dumps({'params': contents, 'epoch_tag': epoch_tag}) + @bottle.get('/<filename:re:.*\.js>') def javascripts(filename): response.set_header('Cache-Control', "public, max-age=1296000") return bottle.static_file(filename, root='static') + def main(): #bottle.debug(True) #bottle.run(reloader=True)