changeset 551:9499bd2f344b

long polling config updates
author Matt Johnston <matt@ucc.asn.au>
date Mon, 08 Jun 2015 22:29:46 +0800
parents 1040946133ea
children d16afb5b5cd9
files py/config.py py/configwaiter.py py/params.py py/tempserver.py py/uploader.py py/utils.py web/log.py web/settings.py web/templog.py
diffstat 9 files changed, 198 insertions(+), 38 deletions(-) [+]
line wrap: on
line diff
--- a/py/config.py	Wed May 27 23:46:06 2015 +0800
+++ b/py/config.py	Mon Jun 08 22:29:46 2015 +0800
@@ -20,8 +20,9 @@
 INTERNAL_TEMPERATURE = '/sys/class/thermal/thermal_zone0/temp'
 
 HMAC_KEY = "a key"
-#UPDATE_URL = 'https://matt.ucc.asn.au/test/templog/update'
-UPDATE_URL = 'https://evil.ucc.asn.au/~matt/templog/update'
+SERVER_URL = 'https://evil.ucc.asn.au/~matt/templog/update'
+UPDATE_URL = "%s/update" % SERVER_URL
+SETTINGS_URL = "%s/update" % SERVER_URL
 
 # site-local values overridden in localconfig, eg WORT_NAME, HMAC_KEY
 try:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/py/configwaiter.py	Mon Jun 08 22:29:46 2015 +0800
@@ -0,0 +1,43 @@
+class ConfigWaiter(object):
+	""" Waits for config updates from the server. http long polling """
+
+	def __init__(self, server):
+		self.server = server
+		self.epoch_tag = None
+		self.http_session = aiohttp.ClientSession()
+
+	@asyncio.coroutine
+	def run(self):
+		# wait until someting has been uploaded (the uploader itself waits 5 seconds)
+		yield from asyncio.sleep(10)
+		while True:
+			yield from self.do()
+
+			# avoid spinning too fast
+			yield from server.sleep(1)
+
+	@asyncio.coroutine
+	def do(self):
+		try:
+			if self.epoch_tag:
+				headers = {'etag': self.epoch_tag}
+			else:
+				headers = None
+
+	        r = yield from asyncio.wait_for(
+	        	self.http_session.get(config.SETTINGS_URL, headers=headers), 
+	        	300)
+	        if r.status == 200:
+		        resp = yield from asyncio.wait_for(r.json(), 300)
+
+		        self.epoch_tag = resp['epoch_tag']
+		        epoch = self.epoch_tag.split('-')[0]
+		        if self.server.params.receive(resp['params'], epoch):
+		        	self.server.reload_signal(True)
+
+		 except Exception as e:
+		 	E("Error watching config: %s" % str(e))
+
+
+
+
--- a/py/params.py	Wed May 27 23:46:06 2015 +0800
+++ b/py/params.py	Mon Jun 08 22:29:46 2015 +0800
@@ -3,9 +3,11 @@
 import json
 import signal
 import io
+import tempfile
 
 import config
 from utils import W,L,E,EX
+import utils
 
 _FIELD_DEFAULTS = {
     'fridge_setpoint': 16,
@@ -24,6 +26,7 @@
 
     def __init__(self):
         self.update(_FIELD_DEFAULTS)
+        self._epoch = None
 
     def __getattr__(self, k):
         return self[k]
@@ -45,6 +48,7 @@
             if k not in self:
                 raise self.Error("Unknown parameter %s=%s in file '%s'" % (str(k), str(u[k]), getattr(f, 'name', '???')))
         self.update(u)
+        self._epoch = utils.hexnonce()
 
         L("Loaded parameters")
         L(self.save_string())
@@ -60,19 +64,54 @@
                     W("Missing parameter file, using defaults. %s" % str(e))
                     return
 
-    def _do_save(self, f):
-        json.dump(self, f, sort_keys=True, indent=4)
-        f.write('\n')
-        f.flush()
+    def get_epoch(self):
+        return self._epoch
+
+    def receive(self, params, epoch):
+        """ updates parameters from the server. does some validation,
+        writes config file to disk.
+        Returns True on success, False failure 
+        """
+
+        if epoch != self._epoch:
+            return
+
+        def same_type(a, b):
+            ta = type(a)
+            tb = type(b)
+
+            if ta == int:
+                ta = float
+            if tb == int:
+                tb = float
+
+            return ta == tb
 
-    def save(self, f = None):
-        if f:
-            return self._do_save(f)
-        else:
-            with file(config.PARAMS_FILE, 'w') as f:
-                return self._do_save(f)
+        if self.keys() != new_params.keys():
+            diff = self.keys() ^ new_params.keys()
+            E("Mismatching params, %s" % str(diff))
+            return False
+
+        for k, v in new_params.items():
+            if not same_type(v, self[k]):
+                E("Bad type for %s" % k)
+                return False
+
+        dir = os.path.dirname(config.PARAMS_FILE)
+        try:
+            t = tempfile.NamedTemporaryFile(prefix='config',
+                dir = dir,
+                delete = False)
+
+            t.write(json.dumps(new_params, sort_keys=True, indent=4)+'\n')
+            name = t.name
+            t.close()
+
+            os.rename(name, config.PARAMS_FILE)
+            return True
+        except Exception as e:
+            E("Problem: %s" % e)
+            return False
 
     def save_string(self):
-        s = io.StringIO()
-        self.save(s)
-        return s.getvalue()
+        return json.dumps(self, sort_keys=True, indent=4)
--- a/py/tempserver.py	Wed May 27 23:46:06 2015 +0800
+++ b/py/tempserver.py	Mon Jun 08 22:29:46 2015 +0800
@@ -32,9 +32,10 @@
         self.params = params.Params()
         self.fridge = fridge.Fridge(self)
         self.uploader = uploader.Uploader(self)
+        self.configwaiter = configwaiter.ConfigWaiter(self)
         self.params.load()
         self.set_sensors(sensor.make_sensor(self))
-        asyncio.get_event_loop().add_signal_handler(signal.SIGHUP, self._reload_signal)
+        asyncio.get_event_loop().add_signal_handler(signal.SIGHUP, self.reload_signal)
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
@@ -52,6 +53,7 @@
             self.fridge.run(),
             self.sensors.run(),
             self.uploader.run(),
+            self.configwaiter.run(),
         ]
 
         loop = asyncio.get_event_loop()
@@ -109,10 +111,11 @@
         except asyncio.TimeoutError:
             pass
 
-    def _reload_signal(self):
+    def reload_signal(self, no_file = False):
         try:
-            self.params.load()
-            L("Reloaded.")
+            if not no_file:
+                self.params.load()
+                L("Reloaded.")
             self._wakeup.set()
             self._wakeup.clear()
         except Error as e:
--- a/py/uploader.py	Wed May 27 23:46:06 2015 +0800
+++ b/py/uploader.py	Mon Jun 08 22:29:46 2015 +0800
@@ -36,6 +36,7 @@
         tosend['fridge_name'] = self.server.wort_name
 
         tosend['current_params'] = dict(self.server.params)
+        tosend['current_params_epoch'] = self.server.params.get_epoch()
 
         tosend['start_time'] = self.server.start_time
         tosend['uptime'] = utils.uptime()
@@ -53,7 +54,7 @@
         send_data = {'data': js_enc.decode(), 'hmac': mac}
         r = yield from asyncio.wait_for(aiohttp.request('post', config.UPDATE_URL, data=send_data), 60)
         result = yield from asyncio.wait_for(r.text(), 60)
-        if result != 'OK':
+        if r.status == 200 and result != 'OK':
             raise Exception("Server returned %s" % result)
 
     @asyncio.coroutine
--- a/py/utils.py	Wed May 27 23:46:06 2015 +0800
+++ b/py/utils.py	Mon Jun 08 22:29:46 2015 +0800
@@ -4,6 +4,7 @@
 import time
 import select
 import logging
+import binascii
 
 D = logging.debug
 L = logging.info
@@ -133,3 +134,5 @@
     except Exception as e:
         return -1
 
+def hexnonce():
+    return binascii.hexlify(os.urandom(120))
--- a/web/log.py	Wed May 27 23:46:06 2015 +0800
+++ b/web/log.py	Mon Jun 08 22:29:46 2015 +0800
@@ -20,6 +20,9 @@
 
 import config
 import atomicfile
+import settings
+
+fridge_settings = settings.Settings()
 
 def sensor_rrd_path(s):
     return '%s/sensor_%s.rrd' % (config.DATA_PATH, str(s))
@@ -245,16 +248,11 @@
     return val_ticks + float(val_rem) * tick_secs / tick_wake
 
 def write_current_params(current_params):
-    out = {}
-    out['params'] = current_params
-    out['time'] = time.time()
-    atomicfile.AtomicFile("%s/current_params.txt" % config.DATA_PATH).write(
-        json.dumps(out, sort_keys=True, indent=4)+'\n')
+    fridge_settings.update(current_params)
 
 def read_current_params():
-    p = atomicfile.AtomicFile("%s/current_params.txt" % config.DATA_PATH).read()
-    dat = json.loads(p)
-    return dat['params']
+    params, epochtag = fridge_settings.get()
+    return params
 
 def parse(params):
 
@@ -278,10 +276,11 @@
 
     # one-off measurements here
     current_params = params['current_params']
+    current_epoch = params['current_params_epoch']
     measurements['fridge_on'] = [ (time.time(), params['fridge_on']) ]
     measurements['fridge_setpoint'] = [ (time.time(), current_params['fridge_setpoint']) ]
 
-    write_current_params(current_params)
+    write_current_params(current_params, current_epoch)
 
     for s, vs in measurements.iteritems():
         sensor_update(s, vs)
@@ -371,10 +370,7 @@
         if not same_type(v, _FIELD_DEFAULTS[k]):
             return "Bad type for %s, %s vs %s" % (k , type(v), type(_FIELD_DEFAULTS[k]))
 
-    ret = send_params(params) 
-    if ret is not True:
-        return "Failed sending params: %s" % ret
-
+    fridge_settings.update(params)
     return True
 
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/settings.py	Mon Jun 08 22:29:46 2015 +0800
@@ -0,0 +1,61 @@
+import gevent
+import fcntl
+import hashlib
+
+class Settings(object):
+    RAND_SIZE = 15 # 120 bits
+
+    """ Handles state updates from both the web UI and from the fridge client.
+    The fridge client is canonical. It provides the epoch (apart from 'startepoch'), that
+    is changed any time the fridge reloads its local config. The fridge only accepts
+    updates that have the same epoch.
+
+    When the web UI changes it keeps the same epoch but generates a new tag. The fridge sends
+    its current known tag and waits for it to change.
+
+    content is opaque, presently a dictionary of decoded json 
+    """
+
+    def __init__(self):
+        self.event = gevent.event.Event()
+        self.contents = None
+        self.epoch = None
+        self.tag = None
+
+        self.update(self, None, 'startepoch')
+
+    def wait(self, epoch_tag = None, timeout = None):
+        """ returns false if the timeout was hit """
+        if self.epoch_tag() != epoch_tag:
+            # has alredy changed
+            return True
+        return self.event.wait(timeout)
+
+    def epoch_tag(self):
+        return '%s-%s' % (self.epoch, self.tag)
+
+    def random(self):
+        return binascii.hexlify(os.urandom(self.RAND_SIZE))
+
+    def update(self, contents, epoch = None):
+        """ replaces settings contents and updates waiters if changed """
+        if epoch:
+            if self.epoch == epoch:
+                return
+            else:
+                self.epoch = epoch
+
+        self.tag = self.random()
+        self.contents = contents
+
+        self.event.set()
+        self.event.clear()
+
+    def get(self):
+        """ Returns (contents, epoch-tag) """
+        return self.contents, self.epoch_tag()
+
+
+
+
+
--- a/web/templog.py	Wed May 27 23:46:06 2015 +0800
+++ b/web/templog.py	Mon Jun 08 22:29:46 2015 +0800
@@ -28,9 +28,8 @@
     def run(*args, **argm):
         argm['server'] = 'gevent'
         super(TemplogBottle, self).run(*args, **argm)
-        print "ran custom bottle"
 
-#bottle.default_app.push(TemplogBottle())
+bottle.default_app.push(TemplogBottle())
 
 secure.setup_csrf()
 
@@ -62,7 +61,6 @@
 def encode_data(data, mimetype):
     return 'data:%s;base64,%s' % (mimetype, binascii.b2a_base64(data).rstrip())
 
-
 @route('/graph.png')
 def graph():
     response.set_header('Content-Type', 'image/png')
@@ -76,12 +74,12 @@
     csrf_blob = post_json['csrf_blob']
 
     if not secure.check_csrf_blob(csrf_blob):
-        bottle.response.status = 403
+        response.status = 403
         return "Bad csrf"
 
     ret = log.update_params(post_json['params'])
     if not ret is True:
-        bottle.response.status = 403
+        response.status = 403
         return ret
         
     return "Good"
@@ -159,11 +157,26 @@
     #var_lookup = environ['mod_ssl.var_lookup']
     #return var_lookup("SSL_SERVER_I_DN_O")
 
+@route('/get_settings')
+def get_settings():
+    response.set_header('Cache-Control', 'no-cache')
+    req_etag = request.headers.get('etag', None)
+    if req_etag:
+        # wait for it to change
+        if not log.fridge_settings.wait(req_etag, timeout=LONG_POLL_TIMEOUT):
+            response.status = 304
+            return "Nothing happened"
+
+    response.set_header('Content-Type', 'application/json')
+    contents, epoch_tag = client_settings.get()
+    return json.dumps({'params': contents, 'epoch_tag': epoch_tag})
+
 @bottle.get('/<filename:re:.*\.js>')
 def javascripts(filename):
     response.set_header('Cache-Control', "public, max-age=1296000")
     return bottle.static_file(filename, root='static')
 
+
 def main():
     #bottle.debug(True)
     #bottle.run(reloader=True)