Mercurial > dropbear
view test/test_dropbear.py @ 1855:35d504d59c05
Implement server-side support for sk-ecdsa U2F-backed keys (#142)
* Implement server-side support for sk-ecdsa U2F-backed keys
* Fix out-of-bounds read on normal ecdsa-sha2-[identifier] keys
* Fix one more potential out-of-bounds read
* Check if nistp256 curve is used in sk-ecdsa-sha2- key
It's the only allowed curve per PROTOCOL.u2f specification
* Implement server-side support for sk-ed25519 FIDO2-backed keys
* Keys with type sk-* make no sense as host keys, so they should be
disabled
* fix typo
* Make sk-ecdsa call buf_ecdsa_verify
This reduces code duplication, the SK code just handles the
different message format.
* Reduce sk specific code
The application id can be stored in signkey, then we don't need
to call sk-specific functions from svr-authpubkey
* Remove debugging output, which causes compilation errors with DEBUG_TRACE disabled
* Proper cleanup of sk_app
Co-authored-by: Matt Johnston <[email protected]>
author | egor-duda <egor-duda@users.noreply.github.com> |
---|---|
date | Sat, 22 Jan 2022 16:53:04 +0300 |
parents | e0c1825c567d |
children | b550845e500b |
line wrap: on
line source
import subprocess import os import pty import tempfile import logging import time import socketserver import threading import queue import pytest LOCALADDR="127.0.5.5" @pytest.fixture(scope="module") def dropbear(request): opt = request.config.option if opt.remote: yield None return args = [opt.dropbear, "-p", LOCALADDR, # bind locally only "-r", opt.hostkey, "-p", opt.port, "-F", "-E", ] p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True) # Wait until it has started listening for l in p.stderr: if "Not backgrounding" in l: break # Check it's still running assert p.poll() is None # Ready yield p p.terminate() print("Terminated dropbear. Flushing output:") for l in p.stderr: print(l.rstrip()) print("Done") def dbclient(request, *args, **kwargs): opt = request.config.option host = opt.remote or LOCALADDR base_args = [opt.dbclient, "-y", host, "-p", opt.port] if opt.user: full_args.extend(['-l', opt.user]) full_args = base_args + list(args) bg = kwargs.get("background") if "background" in kwargs: del kwargs["background"] if bg: return subprocess.Popen(full_args, **kwargs) else: kwargs.setdefault("timeout", 10) # wait for response return subprocess.run(full_args, **kwargs) class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer): """ Listens for a single incoming request, sends a response if given, and returns the inbound data. Reponse can be a queue object, in which case each item in the queue will be sent as a response, until it receives a None item. """ def __init__(self, port, timeout, response=None): super().__init__(('localhost', port), self.Handler) self.port = port self.timeout = timeout self.response = response self.sink = None class Handler(socketserver.StreamRequestHandler): def handle(self): if isinstance(self.server.response, queue.Queue): while True: i = self.server.response.get() if i is None: break self.wfile.write(i) elif self.server.response: self.wfile.write(self.server.response) assert self.server.sink is None, ">1 request sent to handler" self.server.sink = self.rfile.read() def __enter__(self): self.server_thread = threading.Thread(target=self.serve_forever) self.server_thread.daemon = True self.server_thread.start() return self def __exit__(self, *exc_stuff): self.shutdown() self.server_thread.join() def inbound(self): """ Returns the data sent to the socket """ return self.sink def readall_socket(sock): b = [] while True: i = sock.recv(4096) if not i: break b.append(i) return b''.join(b) # returns a str def random_alnum(size): r = os.urandom(500 + size*5) return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()