view test/test_dropbear.py @ 1860:5001e9c5641f

Merge netio changes
author Matt Johnston <matt@ucc.asn.au>
date Thu, 27 Jan 2022 15:09:29 +0800
parents e0c1825c567d
children b550845e500b
line wrap: on
line source

import subprocess
import os
import pty
import tempfile
import logging
import time
import socketserver
import threading
import queue

import pytest

LOCALADDR="127.0.5.5"

@pytest.fixture(scope="module")
def dropbear(request):
	opt = request.config.option
	if opt.remote:
		yield None
		return

	args = [opt.dropbear,
		"-p", LOCALADDR, # bind locally only
		"-r", opt.hostkey,
		"-p", opt.port,
		"-F", "-E",
		]
	p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
	# Wait until it has started listening
	for l in p.stderr:
		if "Not backgrounding" in l:
			break
	# Check it's still running
		assert p.poll() is None
	# Ready
	yield p
	p.terminate()
	print("Terminated dropbear. Flushing output:")
	for l in p.stderr:
		print(l.rstrip())
	print("Done")

def dbclient(request, *args, **kwargs):
	opt = request.config.option
	host = opt.remote or LOCALADDR
	base_args = [opt.dbclient, "-y", host, "-p", opt.port]
	if opt.user:
		full_args.extend(['-l', opt.user])
	full_args = base_args + list(args)
	bg = kwargs.get("background")
	if "background" in kwargs:
		del kwargs["background"]
	if bg:
		return subprocess.Popen(full_args, **kwargs)
	else:
		kwargs.setdefault("timeout", 10)
		# wait for response
		return subprocess.run(full_args, **kwargs)

class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
	""" Listens for a single incoming request, sends a response if given,
	and returns the inbound data.
	Reponse can be a queue object, in which case each item in the queue will
	be sent as a response, until it receives a None item.
	"""
	def __init__(self, port, timeout, response=None):
		super().__init__(('localhost', port), self.Handler)
		self.port = 	port
		self.timeout = timeout
		self.response = response
		self.sink = None

	class Handler(socketserver.StreamRequestHandler):
		def handle(self):
			if isinstance(self.server.response, queue.Queue):
				while True:
					i = self.server.response.get()
					if i is None:
						break
					self.wfile.write(i)
			elif self.server.response:
				self.wfile.write(self.server.response)
			assert self.server.sink is None, ">1 request sent to handler"
			self.server.sink = self.rfile.read()

	def __enter__(self):
		self.server_thread = threading.Thread(target=self.serve_forever)
		self.server_thread.daemon = True
		self.server_thread.start()
		return self

	def __exit__(self, *exc_stuff):
		self.shutdown()
		self.server_thread.join()

	def inbound(self):
		""" Returns the data sent to the socket """
		return self.sink

def readall_socket(sock):
	b = []
	while True:
		i = sock.recv(4096)
		if not i:
			break
		b.append(i)
	return b''.join(b)

# returns a str
def random_alnum(size):
	r = os.urandom(500 + size*5)
	return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()