diff test/test_dropbear.py @ 1836:06c7ddbb9dd6

Add first channel tests These initial tests are checking various edge cases of channel handling that have cropped up over the years.
author Matt Johnston <matt@codeconstruct.com.au>
date Mon, 18 Oct 2021 14:22:37 +0800
parents
children d757f48ae29f
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/test_dropbear.py	Mon Oct 18 14:22:37 2021 +0800
@@ -0,0 +1,109 @@
+import subprocess
+import os
+import pty
+import tempfile
+import logging
+import time
+import socketserver
+import threading
+import queue
+
+import pytest
+
+LOCALADDR="127.0.5.5"
+
[email protected](scope="module")
+def dropbear(request):
+	opt = request.config.option
+	if opt.remote:
+		yield None
+		return
+
+	args = [opt.dropbear,
+		"-p", LOCALADDR, # bind locally only
+		"-r", opt.hostkey,
+		"-p", opt.port,
+		"-F", "-E",
+		]
+	p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
+	# Wait until it has started listening
+	for l in p.stderr:
+		if "Not backgrounding" in l:
+			break
+	# Check it's still running
+		assert p.poll() is None
+	# Ready
+	yield p
+	p.terminate()
+
+def dbclient(request, *args, **kwargs):
+	opt = request.config.option
+	host = opt.remote or LOCALADDR
+	base_args = [opt.dbclient, "-y", host, "-p", opt.port]
+	if opt.user:
+		full_args.extend(['-l', opt.user])
+	full_args = base_args + list(args)
+	bg = kwargs.get("background")
+	if "background" in kwargs:
+		del kwargs["background"]
+	if bg:
+		return subprocess.Popen(full_args, **kwargs)
+	else:
+		# wait for response
+		return subprocess.run(full_args, **kwargs)
+
+class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
+	""" Listens for a single incoming request, sends a response if given,
+	and returns the inbound data.
+	Reponse can be a queue object, in which case each item in the queue will
+	be sent as a response, until it receives a None item.
+	"""
+	def __init__(self, port, timeout, response=None):
+		super().__init__(('localhost', port), self.Handler)
+		self.port = 	port
+		self.timeout = timeout
+		self.response = response
+		self.sink = None
+
+	class Handler(socketserver.StreamRequestHandler):
+		def handle(self):
+			if isinstance(self.server.response, queue.Queue):
+				while True:
+					i = self.server.response.get()
+					if i is None:
+						break
+					self.wfile.write(i)
+			elif self.server.response:
+				self.wfile.write(self.server.response)
+			assert self.server.sink is None, ">1 request sent to handler"
+			self.server.sink = self.rfile.read()
+
+	def __enter__(self):
+		self.server_thread = threading.Thread(target=self.serve_forever)
+		self.server_thread.daemon = True
+		self.server_thread.start()
+		return self
+
+	def __exit__(self, *exc_stuff):
+		self.shutdown()
+		self.server_thread.join()
+
+	def inbound(self):
+		""" Returns the data sent to the socket """
+		return self.sink
+
+def readall_socket(sock):
+	b = []
+	while True:
+		i = sock.recv(4096)
+		if not i:
+			break
+		b.append(i)
+	return b''.join(b)
+
+# returns a str
+def random_alnum(size):
+	r = os.urandom(500 + size*5)
+	return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()
+
+