Mercurial > dropbear
diff test/test_dropbear.py @ 1836:06c7ddbb9dd6
Add first channel tests
These initial tests are checking various edge cases of channel handling
that have cropped up over the years.
author | Matt Johnston <matt@codeconstruct.com.au> |
---|---|
date | Mon, 18 Oct 2021 14:22:37 +0800 |
parents | |
children | d757f48ae29f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/test_dropbear.py Mon Oct 18 14:22:37 2021 +0800 @@ -0,0 +1,109 @@ +import subprocess +import os +import pty +import tempfile +import logging +import time +import socketserver +import threading +import queue + +import pytest + +LOCALADDR="127.0.5.5" + [email protected](scope="module") +def dropbear(request): + opt = request.config.option + if opt.remote: + yield None + return + + args = [opt.dropbear, + "-p", LOCALADDR, # bind locally only + "-r", opt.hostkey, + "-p", opt.port, + "-F", "-E", + ] + p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True) + # Wait until it has started listening + for l in p.stderr: + if "Not backgrounding" in l: + break + # Check it's still running + assert p.poll() is None + # Ready + yield p + p.terminate() + +def dbclient(request, *args, **kwargs): + opt = request.config.option + host = opt.remote or LOCALADDR + base_args = [opt.dbclient, "-y", host, "-p", opt.port] + if opt.user: + full_args.extend(['-l', opt.user]) + full_args = base_args + list(args) + bg = kwargs.get("background") + if "background" in kwargs: + del kwargs["background"] + if bg: + return subprocess.Popen(full_args, **kwargs) + else: + # wait for response + return subprocess.run(full_args, **kwargs) + +class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer): + """ Listens for a single incoming request, sends a response if given, + and returns the inbound data. + Reponse can be a queue object, in which case each item in the queue will + be sent as a response, until it receives a None item. + """ + def __init__(self, port, timeout, response=None): + super().__init__(('localhost', port), self.Handler) + self.port = port + self.timeout = timeout + self.response = response + self.sink = None + + class Handler(socketserver.StreamRequestHandler): + def handle(self): + if isinstance(self.server.response, queue.Queue): + while True: + i = self.server.response.get() + if i is None: + break + self.wfile.write(i) + elif self.server.response: + self.wfile.write(self.server.response) + assert self.server.sink is None, ">1 request sent to handler" + self.server.sink = self.rfile.read() + + def __enter__(self): + self.server_thread = threading.Thread(target=self.serve_forever) + self.server_thread.daemon = True + self.server_thread.start() + return self + + def __exit__(self, *exc_stuff): + self.shutdown() + self.server_thread.join() + + def inbound(self): + """ Returns the data sent to the socket """ + return self.sink + +def readall_socket(sock): + b = [] + while True: + i = sock.recv(4096) + if not i: + break + b.append(i) + return b''.join(b) + +# returns a str +def random_alnum(size): + r = os.urandom(500 + size*5) + return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode() + +