Mercurial > templog
changeset 228:d9e81a563923
porting to asyncio
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Fri, 20 Mar 2015 20:12:25 +0800 |
parents | efb5cad2e98b |
children | 99255c501e02 |
files | py/config.py py/fridge.py py/gpio.py py/gpio_rpi.py py/gpio_test.py py/params.py py/requirements.txt py/sensor_ds18b20.py py/tempserver.py py/uploader.py |
diffstat | 10 files changed, 113 insertions(+), 95 deletions(-) [+] |
line wrap: on
line diff
--- a/py/config.py Sat Feb 28 00:14:21 2015 +0800 +++ b/py/config.py Fri Mar 20 20:12:25 2015 +0800 @@ -13,7 +13,7 @@ PARAMS_FILE = os.path.join(os.path.dirname(__file__), 'tempserver.conf') SENSOR_BASE_DIR = '/sys/devices/w1_bus_master1' -FRIDGE_GPIO = '/sys/devices/virtual/gpio/gpio17' +FRIDGE_GPIO_PIN = 17 WORT_NAME = '28-0000042cf4dd' FRIDGE_NAME = '28-0000042cccc4' AMBIENT_NAME = '28-0000042c6dbb'
--- a/py/fridge.py Sat Feb 28 00:14:21 2015 +0800 +++ b/py/fridge.py Fri Mar 20 20:12:25 2015 +0800 @@ -1,35 +1,24 @@ # -*- coding: utf-8 -*- +import asyncio + from utils import L,W,E,EX,D import config -import gevent -class Fridge(gevent.Greenlet): +import gpio + +class Fridge(object): OVERSHOOT_MAX_DIV = 1800.0 # 30 mins def __init__(self, server): - gevent.Greenlet.__init__(self) self.server = server - self.setup_gpio() + self.gpio = gpio.Gpio(config.FRIDGE_GPIO_PIN, "fridge") self.wort_valid_clock = 0 self.fridge_on_clock = 0 self.off() - def setup_gpio(self): - dir_fn = '%s/direction' % config.FRIDGE_GPIO - with open(dir_fn, 'w') as f: - f.write('low') - val_fn = '%s/value' % config.FRIDGE_GPIO - # XXX - Fridge should have __enter__/__exit__, close the file there. - self.value_file = open(val_fn, 'r+') - def turn(self, value): - self.value_file.seek(0) - if value: - self.value_file.write('1') - else: - self.value_file.write('0') - self.value_file.flush() + self.gpio.turn(value) def on(self): self.turn(True) @@ -39,22 +28,15 @@ self.fridge_off_clock = self.server.now() def is_on(self): - self.value_file.seek(0) - buf = self.value_file.read().strip() - if buf == '0': - return False - if buf != '1': - E("Bad value read from gpio '%s': '%s'" - % (self.value_file.name, buf)) - return True + return self.gpio.get_state() - # greenlet subclassed - def _run(self): + @asyncio.coroutine + def run(self): if self.server.params.disabled: L("Fridge is disabled") while True: self.do() - self.server.sleep(config.FRIDGE_SLEEP) + yield from self.server.sleep(config.FRIDGE_SLEEP) def do(self): """ this is the main fridge control logic """
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/py/gpio.py Fri Mar 20 20:12:25 2015 +0800 @@ -0,0 +1,4 @@ +try: + from gpio_rpi import * +except ImportError: + from gpio_test import *
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/py/gpio_rpi.py Fri Mar 20 20:12:25 2015 +0800 @@ -0,0 +1,21 @@ +import os + +import RPi.GPIO as GPIO + +from utils import L,D,EX,W + +__all__ = ["Gpio"] + +class Gpio(object): + def __init__(self, pin, name): + self.pin = pin + self.name = name + GPIO.setmode(GPIO.BOARD) + GPIO.setup(self.pin, GPIO.OUT) + + def turn(self, value): + self.state = bool(value) + GPIO.output(self.pin, self.state) + + def get_state(self): + return GPIO.input(self.pin)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/py/gpio_test.py Fri Mar 20 20:12:25 2015 +0800 @@ -0,0 +1,23 @@ +import os + +from utils import L,D,EX,W + +__all__ = ["Gpio"] + +class Gpio(object): + def __init__(self, pin, name): + self.name = name + self.pin = name + self.state = False + L("Test GPIO %s pin %d started, set off." % (name, pin)) + + def turn(self, value): + self.state = bool(value) + onoff = ("off", "on")[int(self.state)] + L("Test GPIO %s pin %d turned %s" % (self.name, self.pin, onoff)) + + def get_state(self): + return self.state + + +
--- a/py/params.py Sat Feb 28 00:14:21 2015 +0800 +++ b/py/params.py Fri Mar 20 20:12:25 2015 +0800 @@ -4,8 +4,6 @@ import signal import StringIO -import gevent - import config from utils import W,L,E,EX
--- a/py/requirements.txt Sat Feb 28 00:14:21 2015 +0800 +++ b/py/requirements.txt Fri Mar 20 20:12:25 2015 +0800 @@ -1,17 +1,9 @@ -argparse==1.2.1 -wsgiref==0.1.2 - -# sha256: v6nYRtuRp9i2o26HNT7tZBx-Pn0L-guZdXltIn8ttOs -gevent==1.0 - -# sha256: sWDlVqIuFrrj8_Y__OeJhoLIA82JZFcZL3tU_nT-mR4 -greenlet==0.4.2 +# sha256: nkIlLxfR3YnuMXReDE--WIYsJRR-sO9SlcnNm8tOosE +lockfile==0.10.2 -# sha256: I9pYnJH1nLfGRNXOXfU51Eg0G9R5kX1t3pc_guJxkUc -lockfile==0.9.1 +# sha256: 2zFqD89UuXAsr2ymGbdr4l1T9e4Hgbr_C7ni4DVfryQ +python-daemon==2.0.5 -# sha256: FmX7Fr_q5y8Wqi3kC8dWYUWL1Ccxp9RjqRGo1er5bAs -python-daemon==1.6 +# sha256: IzjIUGznhTrC3377pzGj_QFafuJWGvqw1p3e-0NAP1o +aiohttp==0.14.4 -# sha256: NkiAJJLpVf_rKPbauGStcUBZ9UOL9nmNgvnUd8ZmrKM -requests==2.3.0
--- a/py/sensor_ds18b20.py Sat Feb 28 00:14:21 2015 +0800 +++ b/py/sensor_ds18b20.py Fri Mar 20 20:12:25 2015 +0800 @@ -2,27 +2,26 @@ import os import re - -import gevent -import gevent.threadpool +import asyncio +import concurrent.futures import config from utils import D,L,W,E,EX -class DS18B20s(gevent.Greenlet): +class DS18B20s(object): THERM_RE = re.compile('.* YES\n.*t=(.*)\n', re.MULTILINE) def __init__(self, server): - gevent.Greenlet.__init__(self) self.server = server - self.readthread = gevent.threadpool.ThreadPool(1) + self.readthread = concurrent.futures.ThreadPoolExecutor(max_workers=1) self.master_dir = config.SENSOR_BASE_DIR + @asyncio.coroutine def do(self): vals = {} for n in self.sensor_names(): - value = self.do_sensor(n) + value = yield from self.do_sensor(n) if value is not None: vals[n] = value @@ -32,26 +31,30 @@ self.server.add_reading(vals) - def _run(self): + @asyncio.coroutine + def run(self): while True: - self.do() - self.server.sleep(config.SENSOR_SLEEP) + yield from self.do() + yield from self.server.sleep(config.SENSOR_SLEEP) + @asyncio.coroutine def read_wait(self, f): - # handles a blocking file read with a gevent threadpool. A - # real python thread performs the read while other gevent - # greenlets keep running. + # handles a blocking file read with a threadpool. A + # real python thread performs the read while other + # asyncio tasks keep running. # the ds18b20 takes ~750ms to read, which is noticable # interactively. - return self.readthread.apply(f.read) + loop = asyncio.get_event_loop() + yield from loop.run_in_executor(self.readthread, f.read) + @asyncio.coroutine def do_sensor(self, s, contents = None): """ contents can be set by the caller for testing """ try: if contents is None: fn = os.path.join(self.master_dir, s, 'w1_slave') f = open(fn, 'r') - contents = self.read_wait(f) + contents = yield from self.read_wait(f) match = self.THERM_RE.match(contents) if match is None:
--- a/py/tempserver.py Sat Feb 28 00:14:21 2015 +0800 +++ b/py/tempserver.py Fri Mar 20 20:12:25 2015 +0800 @@ -5,9 +5,8 @@ import logging import time import signal +import asyncio -import gevent -import gevent.monkey import lockfile.pidlockfile import daemon @@ -25,10 +24,7 @@ self.readings = [] self.current = (None, None) self.fridge = None - self._wakeup = gevent.event.Event() - - # don't patch os, fork() is used by daemonize - gevent.monkey.patch_all(os=False, thread=False) + self._wakeup = asyncio.Event() def __enter__(self): self.params = params.Params() @@ -36,7 +32,7 @@ self.uploader = uploader.Uploader(self) self.params.load() self.set_sensors(sensor_ds18b20.DS18B20s(self)) - gevent.signal(signal.SIGHUP, self._reload_signal) + asyncio.get_event_loop().add_signal_handler(signal.SIGHUP, self._reload_signal) return self def __exit__(self, exc_type, exc_value, traceback): @@ -50,16 +46,17 @@ # XXX do these go here or in __enter_() ? self.start_time = self.now() - self.fridge.start() - self.sensors.start() - self.uploader.start() - - # won't return. - while True: - try: - gevent.sleep(60) - except KeyboardInterrupt: - break + tasks = ( + self.fridge.run(), + self.sensors.run(), + self.uploader.run(), + ) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(asyncio.wait(tasks)) + # not reached + except KeyboardInterrupt: + pass def now(self): return utils.monotonic_time() @@ -97,7 +94,7 @@ def sleep(self, timeout): """ sleeps for timeout seconds, though wakes if the server's config is updated """ - self._wakeup.wait(timeout) + asyncio.wait_for(self._wakeup, timeout=timeout) def _reload_signal(self): try:
--- a/py/uploader.py Sat Feb 28 00:14:21 2015 +0800 +++ b/py/uploader.py Fri Mar 20 20:12:25 2015 +0800 @@ -3,27 +3,25 @@ import zlib import binascii import logging +import asyncio -import gevent -import requests +import aiohttp import config from utils import L,D,EX,W,E import utils -class Uploader(gevent.Greenlet): +class Uploader(object): def __init__(self, server): - gevent.Greenlet.__init__(self) self.server = server - requests_log = logging.getLogger("requests") - requests_log.setLevel(logging.WARNING) - - def _run(self): - gevent.sleep(5) + @asyncio.coroutine + def run(self): + # wait for the first read + yield from asyncio.sleep(5) while True: - self.do() - self.server.sleep(config.UPLOAD_SLEEP) + yield from self.do() + yield from self.server.sleep(config.UPLOAD_SLEEP) def get_tosend(self, readings): tosend = {} @@ -43,26 +41,26 @@ return tosend + @asyncio.coroutine def send(self, tosend): js = json.dumps(tosend) js_enc = binascii.b2a_base64(zlib.compress(js)) mac = hmac.new(config.HMAC_KEY, js_enc).hexdigest() send_data = {'data': js_enc, 'hmac': mac} - r = requests.post(config.UPDATE_URL, data=send_data) - result = r.text + r = yield from asyncio.wait_for(aiohttp.request('post', config.UPDATE_URL, data=send_data), 60) + result = yield from asyncio.wait_for(r.text(), 60) if result != 'OK': raise Exception("Server returned %s" % result) + @asyncio.coroutine def do(self): readings = self.server.take_readings() try: tosend = self.get_tosend(readings) nreadings = len(readings) - self.send(tosend) + yield from self.send(tosend) readings = None D("Sent updated %d readings" % nreadings) - except requests.exceptions.RequestException, e: - E("Error in uploader: %s" % str(e)) except Exception, e: EX("Error in uploader: %s" % str(e)) finally: