changeset 228:d9e81a563923

porting to asyncio
author Matt Johnston <matt@ucc.asn.au>
date Fri, 20 Mar 2015 20:12:25 +0800
parents efb5cad2e98b
children 99255c501e02
files py/config.py py/fridge.py py/gpio.py py/gpio_rpi.py py/gpio_test.py py/params.py py/requirements.txt py/sensor_ds18b20.py py/tempserver.py py/uploader.py
diffstat 10 files changed, 113 insertions(+), 95 deletions(-) [+]
line wrap: on
line diff
--- a/py/config.py	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/config.py	Fri Mar 20 20:12:25 2015 +0800
@@ -13,7 +13,7 @@
 PARAMS_FILE = os.path.join(os.path.dirname(__file__), 'tempserver.conf')
 
 SENSOR_BASE_DIR = '/sys/devices/w1_bus_master1'
-FRIDGE_GPIO = '/sys/devices/virtual/gpio/gpio17'
+FRIDGE_GPIO_PIN = 17
 WORT_NAME = '28-0000042cf4dd'
 FRIDGE_NAME = '28-0000042cccc4'
 AMBIENT_NAME = '28-0000042c6dbb'
--- a/py/fridge.py	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/fridge.py	Fri Mar 20 20:12:25 2015 +0800
@@ -1,35 +1,24 @@
 # -*- coding: utf-8 -*-
+import asyncio
+
 from utils import L,W,E,EX,D
 import config
-import gevent
 
-class Fridge(gevent.Greenlet):
+import gpio
+
+class Fridge(object):
 
     OVERSHOOT_MAX_DIV = 1800.0 # 30 mins
 
     def __init__(self, server):
-        gevent.Greenlet.__init__(self)
         self.server = server
-        self.setup_gpio()
+        self.gpio = gpio.Gpio(config.FRIDGE_GPIO_PIN, "fridge")
         self.wort_valid_clock = 0
         self.fridge_on_clock = 0
         self.off()
 
-    def setup_gpio(self):
-        dir_fn = '%s/direction' % config.FRIDGE_GPIO
-        with open(dir_fn, 'w') as f:
-            f.write('low')
-        val_fn = '%s/value' % config.FRIDGE_GPIO
-        # XXX - Fridge should have __enter__/__exit__, close the file there.
-        self.value_file = open(val_fn, 'r+')
-
     def turn(self, value):
-        self.value_file.seek(0)
-        if value:
-            self.value_file.write('1')
-        else:
-            self.value_file.write('0')
-        self.value_file.flush()
+        self.gpio.turn(value)
 
     def on(self):
         self.turn(True)
@@ -39,22 +28,15 @@
         self.fridge_off_clock = self.server.now()
 
     def is_on(self):
-        self.value_file.seek(0)
-        buf = self.value_file.read().strip()
-        if buf == '0':
-            return False
-        if buf != '1':
-            E("Bad value read from gpio '%s': '%s'" 
-                % (self.value_file.name, buf))
-        return True
+        return self.gpio.get_state()
 
-    # greenlet subclassed
-    def _run(self):
+    @asyncio.coroutine
+    def run(self):
         if self.server.params.disabled:
             L("Fridge is disabled")
         while True:
             self.do()
-            self.server.sleep(config.FRIDGE_SLEEP)
+            yield from self.server.sleep(config.FRIDGE_SLEEP)
 
     def do(self):
         """ this is the main fridge control logic """
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/py/gpio.py	Fri Mar 20 20:12:25 2015 +0800
@@ -0,0 +1,4 @@
+try:
+	from gpio_rpi import *
+except ImportError:
+	from gpio_test import *
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/py/gpio_rpi.py	Fri Mar 20 20:12:25 2015 +0800
@@ -0,0 +1,21 @@
+import os
+
+import RPi.GPIO as GPIO
+
+from utils import L,D,EX,W
+
+__all__ = ["Gpio"]
+
+class Gpio(object):
+	def __init__(self, pin, name):
+		self.pin = pin
+		self.name = name
+		GPIO.setmode(GPIO.BOARD)
+		GPIO.setup(self.pin, GPIO.OUT)
+
+	def turn(self, value):
+		self.state = bool(value)
+		GPIO.output(self.pin, self.state)
+
+	def get_state(self):
+		return GPIO.input(self.pin)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/py/gpio_test.py	Fri Mar 20 20:12:25 2015 +0800
@@ -0,0 +1,23 @@
+import os
+
+from utils import L,D,EX,W
+
+__all__ = ["Gpio"]
+
+class Gpio(object):
+	def __init__(self, pin, name):
+		self.name = name
+		self.pin = name
+		self.state = False
+		L("Test GPIO %s pin %d started, set off." % (name, pin))
+
+	def turn(self, value):
+		self.state = bool(value)
+		onoff = ("off", "on")[int(self.state)]
+		L("Test GPIO %s pin %d turned %s" % (self.name, self.pin, onoff))
+
+	def get_state(self):
+		return self.state
+		
+
+	
--- a/py/params.py	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/params.py	Fri Mar 20 20:12:25 2015 +0800
@@ -4,8 +4,6 @@
 import signal
 import StringIO
 
-import gevent
-
 import config
 from utils import W,L,E,EX
 
--- a/py/requirements.txt	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/requirements.txt	Fri Mar 20 20:12:25 2015 +0800
@@ -1,17 +1,9 @@
-argparse==1.2.1
-wsgiref==0.1.2
-
-# sha256: v6nYRtuRp9i2o26HNT7tZBx-Pn0L-guZdXltIn8ttOs
-gevent==1.0
-
-# sha256: sWDlVqIuFrrj8_Y__OeJhoLIA82JZFcZL3tU_nT-mR4
-greenlet==0.4.2
+# sha256: nkIlLxfR3YnuMXReDE--WIYsJRR-sO9SlcnNm8tOosE
+lockfile==0.10.2
 
-# sha256: I9pYnJH1nLfGRNXOXfU51Eg0G9R5kX1t3pc_guJxkUc
-lockfile==0.9.1
+# sha256: 2zFqD89UuXAsr2ymGbdr4l1T9e4Hgbr_C7ni4DVfryQ
+python-daemon==2.0.5
 
-# sha256: FmX7Fr_q5y8Wqi3kC8dWYUWL1Ccxp9RjqRGo1er5bAs
-python-daemon==1.6
+# sha256: IzjIUGznhTrC3377pzGj_QFafuJWGvqw1p3e-0NAP1o
+aiohttp==0.14.4
 
-# sha256: NkiAJJLpVf_rKPbauGStcUBZ9UOL9nmNgvnUd8ZmrKM
-requests==2.3.0
--- a/py/sensor_ds18b20.py	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/sensor_ds18b20.py	Fri Mar 20 20:12:25 2015 +0800
@@ -2,27 +2,26 @@
 
 import os
 import re
-
-import gevent
-import gevent.threadpool
+import asyncio
+import concurrent.futures
 
 import config
 from utils import D,L,W,E,EX
 
-class DS18B20s(gevent.Greenlet):
+class DS18B20s(object):
 
     THERM_RE = re.compile('.* YES\n.*t=(.*)\n', re.MULTILINE)
 
     def __init__(self, server):
-        gevent.Greenlet.__init__(self)
         self.server = server
-        self.readthread = gevent.threadpool.ThreadPool(1)
+        self.readthread = concurrent.futures.ThreadPoolExecutor(max_workers=1)
         self.master_dir = config.SENSOR_BASE_DIR
 
+    @asyncio.coroutine
     def do(self):
         vals = {}
         for n in self.sensor_names():
-                value = self.do_sensor(n)
+                value = yield from self.do_sensor(n)
                 if value is not None:
                     vals[n] = value
 
@@ -32,26 +31,30 @@
 
         self.server.add_reading(vals)
 
-    def _run(self):
+    @asyncio.coroutine
+    def run(self):
         while True:
-            self.do()
-            self.server.sleep(config.SENSOR_SLEEP)
+            yield from self.do()
+            yield from self.server.sleep(config.SENSOR_SLEEP)
 
+    @asyncio.coroutine
     def read_wait(self, f):
-        # handles a blocking file read with a gevent threadpool. A
-        # real python thread performs the read while other gevent
-        # greenlets keep running.
+        # handles a blocking file read with a threadpool. A
+        # real python thread performs the read while other 
+        # asyncio tasks keep running.
         # the ds18b20 takes ~750ms to read, which is noticable
         # interactively.
-        return self.readthread.apply(f.read)
+        loop = asyncio.get_event_loop()
+        yield from loop.run_in_executor(self.readthread, f.read)
 
+    @asyncio.coroutine
     def do_sensor(self, s, contents = None):
         """ contents can be set by the caller for testing """
         try:
             if contents is None:
                 fn = os.path.join(self.master_dir, s, 'w1_slave')
                 f = open(fn, 'r')
-                contents = self.read_wait(f)
+                contents = yield from self.read_wait(f)
 
             match = self.THERM_RE.match(contents)
             if match is None:
--- a/py/tempserver.py	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/tempserver.py	Fri Mar 20 20:12:25 2015 +0800
@@ -5,9 +5,8 @@
 import logging
 import time
 import signal
+import asyncio
 
-import gevent
-import gevent.monkey
 import lockfile.pidlockfile
 import daemon
 
@@ -25,10 +24,7 @@
         self.readings = []
         self.current = (None, None)
         self.fridge = None
-        self._wakeup = gevent.event.Event()
-
-        # don't patch os, fork() is used by daemonize
-        gevent.monkey.patch_all(os=False, thread=False)
+        self._wakeup = asyncio.Event()
 
     def __enter__(self):
         self.params = params.Params()
@@ -36,7 +32,7 @@
         self.uploader = uploader.Uploader(self)
         self.params.load()
         self.set_sensors(sensor_ds18b20.DS18B20s(self))
-        gevent.signal(signal.SIGHUP, self._reload_signal)
+        asyncio.get_event_loop().add_signal_handler(signal.SIGHUP, self._reload_signal)
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
@@ -50,16 +46,17 @@
 
         # XXX do these go here or in __enter_() ?
         self.start_time = self.now()
-        self.fridge.start()
-        self.sensors.start()
-        self.uploader.start()
-
-        # won't return.
-        while True:
-            try:
-                gevent.sleep(60)
-            except KeyboardInterrupt:
-                break
+        tasks = (
+            self.fridge.run(),
+            self.sensors.run(),
+            self.uploader.run(),
+        )
+        loop = asyncio.get_event_loop()
+        try:
+            loop.run_until_complete(asyncio.wait(tasks))
+            # not reached
+        except KeyboardInterrupt:
+            pass
 
     def now(self):
         return utils.monotonic_time()
@@ -97,7 +94,7 @@
 
     def sleep(self, timeout):
         """ sleeps for timeout seconds, though wakes if the server's config is updated """
-        self._wakeup.wait(timeout)
+        asyncio.wait_for(self._wakeup, timeout=timeout)
         
     def _reload_signal(self):
         try:
--- a/py/uploader.py	Sat Feb 28 00:14:21 2015 +0800
+++ b/py/uploader.py	Fri Mar 20 20:12:25 2015 +0800
@@ -3,27 +3,25 @@
 import zlib
 import binascii
 import logging
+import asyncio
 
-import gevent
-import requests
+import aiohttp
 
 import config
 from utils import L,D,EX,W,E
 import utils
 
-class Uploader(gevent.Greenlet):
+class Uploader(object):
     def __init__(self, server):
-        gevent.Greenlet.__init__(self)
         self.server = server
 
-        requests_log = logging.getLogger("requests")
-        requests_log.setLevel(logging.WARNING)
-
-    def _run(self):
-        gevent.sleep(5)
+    @asyncio.coroutine
+    def run(self):
+        # wait for the first read
+        yield from asyncio.sleep(5)
         while True:
-            self.do()
-            self.server.sleep(config.UPLOAD_SLEEP)
+            yield from self.do()
+            yield from self.server.sleep(config.UPLOAD_SLEEP)
 
     def get_tosend(self, readings):
         tosend = {}
@@ -43,26 +41,26 @@
 
         return tosend
 
+    @asyncio.coroutine
     def send(self, tosend):
         js = json.dumps(tosend)
         js_enc = binascii.b2a_base64(zlib.compress(js))
         mac = hmac.new(config.HMAC_KEY, js_enc).hexdigest()
         send_data = {'data': js_enc, 'hmac': mac}
-        r = requests.post(config.UPDATE_URL, data=send_data)
-        result = r.text
+        r = yield from asyncio.wait_for(aiohttp.request('post', config.UPDATE_URL, data=send_data), 60)
+        result = yield from asyncio.wait_for(r.text(), 60)
         if result != 'OK':
             raise Exception("Server returned %s" % result)
 
+    @asyncio.coroutine
     def do(self):
         readings = self.server.take_readings()
         try:
             tosend = self.get_tosend(readings)
             nreadings = len(readings)
-            self.send(tosend)
+            yield from self.send(tosend)
             readings = None
             D("Sent updated %d readings" % nreadings)
-        except requests.exceptions.RequestException, e:
-            E("Error in uploader: %s" % str(e))
         except Exception, e:
             EX("Error in uploader: %s" % str(e))
         finally: