Mercurial > dropbear
view test/test_dropbear.py @ 1857:6022df862942
Use DSCP for IP QoS traffic classes
The previous TOS values are deprecated and not used by modern traffic
classifiers. This sets AF21 for "interactive" traffic (with a tty).
Non-tty traffic sets AF11 - that indicates high throughput but is not
lowest priority (which would be CS1 or LE).
This differs from the CS1 used by OpenSSH, it lets interactive git over SSH
have higher priority than background least effort traffic. Dropbear's settings
here should be suitable with the diffservs used by CAKE qdisc.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Tue, 25 Jan 2022 17:32:20 +0800 |
parents | e0c1825c567d |
children | b550845e500b |
line wrap: on
line source
import subprocess import os import pty import tempfile import logging import time import socketserver import threading import queue import pytest LOCALADDR="127.0.5.5" @pytest.fixture(scope="module") def dropbear(request): opt = request.config.option if opt.remote: yield None return args = [opt.dropbear, "-p", LOCALADDR, # bind locally only "-r", opt.hostkey, "-p", opt.port, "-F", "-E", ] p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True) # Wait until it has started listening for l in p.stderr: if "Not backgrounding" in l: break # Check it's still running assert p.poll() is None # Ready yield p p.terminate() print("Terminated dropbear. Flushing output:") for l in p.stderr: print(l.rstrip()) print("Done") def dbclient(request, *args, **kwargs): opt = request.config.option host = opt.remote or LOCALADDR base_args = [opt.dbclient, "-y", host, "-p", opt.port] if opt.user: full_args.extend(['-l', opt.user]) full_args = base_args + list(args) bg = kwargs.get("background") if "background" in kwargs: del kwargs["background"] if bg: return subprocess.Popen(full_args, **kwargs) else: kwargs.setdefault("timeout", 10) # wait for response return subprocess.run(full_args, **kwargs) class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer): """ Listens for a single incoming request, sends a response if given, and returns the inbound data. Reponse can be a queue object, in which case each item in the queue will be sent as a response, until it receives a None item. """ def __init__(self, port, timeout, response=None): super().__init__(('localhost', port), self.Handler) self.port = port self.timeout = timeout self.response = response self.sink = None class Handler(socketserver.StreamRequestHandler): def handle(self): if isinstance(self.server.response, queue.Queue): while True: i = self.server.response.get() if i is None: break self.wfile.write(i) elif self.server.response: self.wfile.write(self.server.response) assert self.server.sink is None, ">1 request sent to handler" self.server.sink = self.rfile.read() def __enter__(self): self.server_thread = threading.Thread(target=self.serve_forever) self.server_thread.daemon = True self.server_thread.start() return self def __exit__(self, *exc_stuff): self.shutdown() self.server_thread.join() def inbound(self): """ Returns the data sent to the socket """ return self.sink def readall_socket(sock): b = [] while True: i = sock.recv(4096) if not i: break b.append(i) return b''.join(b) # returns a str def random_alnum(size): r = os.urandom(500 + size*5) return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()