Mercurial > dropbear
view test/test_dropbear.py @ 1861:2b3a8026a6ce
Add re-exec for server
This allows ASLR to re-randomize the address
space for every connection, preventing some
vulnerabilities from being exploitable by
repeated probing.
Overhead (memory and time) is yet to be confirmed.
At present this is only enabled on Linux. Other BSD platforms
with fexecve() would probably also work though have not been tested.
author | Matt Johnston <matt@ucc.asn.au> |
---|---|
date | Sun, 30 Jan 2022 10:14:56 +0800 |
parents | e0c1825c567d |
children | b550845e500b |
line wrap: on
line source
import subprocess import os import pty import tempfile import logging import time import socketserver import threading import queue import pytest LOCALADDR="127.0.5.5" @pytest.fixture(scope="module") def dropbear(request): opt = request.config.option if opt.remote: yield None return args = [opt.dropbear, "-p", LOCALADDR, # bind locally only "-r", opt.hostkey, "-p", opt.port, "-F", "-E", ] p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True) # Wait until it has started listening for l in p.stderr: if "Not backgrounding" in l: break # Check it's still running assert p.poll() is None # Ready yield p p.terminate() print("Terminated dropbear. Flushing output:") for l in p.stderr: print(l.rstrip()) print("Done") def dbclient(request, *args, **kwargs): opt = request.config.option host = opt.remote or LOCALADDR base_args = [opt.dbclient, "-y", host, "-p", opt.port] if opt.user: full_args.extend(['-l', opt.user]) full_args = base_args + list(args) bg = kwargs.get("background") if "background" in kwargs: del kwargs["background"] if bg: return subprocess.Popen(full_args, **kwargs) else: kwargs.setdefault("timeout", 10) # wait for response return subprocess.run(full_args, **kwargs) class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer): """ Listens for a single incoming request, sends a response if given, and returns the inbound data. Reponse can be a queue object, in which case each item in the queue will be sent as a response, until it receives a None item. """ def __init__(self, port, timeout, response=None): super().__init__(('localhost', port), self.Handler) self.port = port self.timeout = timeout self.response = response self.sink = None class Handler(socketserver.StreamRequestHandler): def handle(self): if isinstance(self.server.response, queue.Queue): while True: i = self.server.response.get() if i is None: break self.wfile.write(i) elif self.server.response: self.wfile.write(self.server.response) assert self.server.sink is None, ">1 request sent to handler" self.server.sink = self.rfile.read() def __enter__(self): self.server_thread = threading.Thread(target=self.serve_forever) self.server_thread.daemon = True self.server_thread.start() return self def __exit__(self, *exc_stuff): self.shutdown() self.server_thread.join() def inbound(self): """ Returns the data sent to the socket """ return self.sink def readall_socket(sock): b = [] while True: i = sock.recv(4096) if not i: break b.append(i) return b''.join(b) # returns a str def random_alnum(size): r = os.urandom(500 + size*5) return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()