changeset 1836:06c7ddbb9dd6

Add first channel tests These initial tests are checking various edge cases of channel handling that have cropped up over the years.
author Matt Johnston <>
date Mon, 18 Oct 2021 14:22:37 +0800 (2021-10-18)
parents 90ac15aeac43
children df7bfd2f7d45
files test/ test/requirements.txt test/ test/
diffstat 4 files changed, 269 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/	Mon Oct 18 14:22:37 2021 +0800
@@ -0,0 +1,18 @@
+def pytest_addoption(parser):
+    parser.addoption("--port", type=str, help="default is 2244 local, 22 remote")
+    parser.addoption("--dbclient", type=str, default="../dbclient")
+    parser.addoption("--dropbear", type=str, default="../dropbear")
+    parser.addoption("--hostkey", type=str, help="required unless --remote")
+    parser.addoption("--remote", type=str, help="remote host")
+    parser.addoption("--user", type=str, help="optional username")
+def pytest_configure(config):
+    opt = config.option
+    if not opt.hostkey and not opt.remote:
+        raise Exception("--hostkey must be given")
+    if not opt.port:
+        if opt.remote:
+            opt.port = "22"
+        else:
+            opt.port = "2244"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/requirements.txt	Mon Oct 18 14:22:37 2021 +0800
@@ -0,0 +1,8 @@
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/	Mon Oct 18 14:22:37 2021 +0800
@@ -0,0 +1,134 @@
+from test_dropbear import *
+import signal
+import queue
+import socket
+# Tests for various edge cases of SSH channels and connection service
+def test_exitcode(request, dropbear):
+	r = dbclient(request, "exit 44")
+	assert r.returncode == 44
+@pytest.mark.xfail(reason="Not yet implemented", strict=True)
+def test_signal(request, dropbear):
+	r = dbclient(request, "kill -FPE $$")
+	assert r.returncode == -signal.SIGFPE
+@pytest.mark.parametrize("size", [0, 1, 2, 100, 5000, 200_000])
+def test_roundtrip(request, dropbear, size):
+	dat = os.urandom(size)
+	r = dbclient(request, "cat", input=dat, capture_output=True)
+	r.check_returncode()
+	assert r.stdout == dat
+@pytest.mark.parametrize("size", [0, 1, 2, 100, 20001, 41234])
+def test_read_pty(request, dropbear, size):
+	# testcase for
+	#
+	#
+	# From Yousong Zhou
+	# Fixed Oct 2021
+	#
+	#$ ssh -t my.router cat /tmp/bigfile | wc
+	#Connection to my.router closed.
+	#  0       1   14335 <- should be 20001
+	# Write the file. No newlines etc which could confuse ptys
+	dat = random_alnum(size)
+	r = dbclient(request, "tmpf=`mktemp`; echo $tmpf; cat > $tmpf", input=dat, capture_output=True, text=True)
+	tmpf = r.stdout.rstrip()
+	r.check_returncode()
+	# Read with a pty, this is what is being tested.
+	# Timing/buffering is subtle, we seem to need to cat a file from disk to hit it.
+	m, s = pty.openpty()
+	r = dbclient(request, "-t", f"cat {tmpf}; rm {tmpf}", stdin=s, capture_output=True)
+	r.check_returncode()
+	assert r.stdout.decode() == dat
+@pytest.mark.parametrize("fd", [1, 2])
+def test_bg_sleep(request, fd, dropbear):
+	#
+	# Rob Landley "Is this a bug?" 24 Mar 2006
+	# dbclient user@system "sleep 10& echo hello"
+	#
+	# It should return right after printing hello, but it doesn't.  It waits until
+	# the child process exits.
+	# failure is TimeoutExpired
+	redir = "" if fd == 1 else " >&2 "
+	r = dbclient(request, f"sleep 10& echo hello {redir}",
+		capture_output=True, timeout=2, text=True)
+	r.check_returncode()
+	st = r.stdout if fd == 1 else r.stderr
+	if fd == 2 and 'accepted unconditionally' in st:
+		# ignore hostkey warning, a bit of a hack
+		assert st.endswith("\n\nhello\n")
+	else:
+		assert st.rstrip() == "hello"
+def test_idle(request, dropbear):
+	# Idle test, -I 1 should make it return before the 2 second timeout
+	r = dbclient(request, "-I", "1", "echo zong; sleep 10",
+		capture_output=True, timeout=2, text=True)
+	r.check_returncode()
+	assert r.stdout.rstrip() == "zong"
+@pytest.mark.parametrize("size", [1, 4000, 40000])
+def test_netcat(request, dropbear, size):
+	opt = request.config.option
+	if opt.remote:
+		pytest.xfail("don't know netcat address for remote")
+	dat1 = os.urandom(size)
+	dat2 = os.urandom(size)
+	with HandleTcp(3344, 1, dat2) as tcp:
+		r = dbclient(request, "-B", "localhost:3344", input=dat1, capture_output=True)
+		r.check_returncode()
+		assert r.stdout == dat2
+		assert tcp.inbound() == dat1
+@pytest.mark.parametrize("size", [1, 4000, 40000])
+@pytest.mark.parametrize("fwd_flag", "LR")
+def test_tcpflushout(request, dropbear, size, fwd_flag):
+	""" Tests that an opened TCP connection prevent a SSH session from being closed
+	until that TCP connection has finished transferring
+	"""
+	opt = request.config.option
+	if opt.remote:
+		pytest.xfail("don't know address for remote")
+	dat1 = os.urandom(size)
+	dat2 = os.urandom(size)
+	q = queue.Queue()
+	with HandleTcp(3344, timeout=1, response=q) as tcp:
+		r = dbclient(request, f"-{fwd_flag}", "7788:localhost:3344", "sleep 0.1; echo -n done",
+			text=True, background=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
+		# time to let the listener start
+		time.sleep(0.1)
+		# open a tcp connection
+		c = socket.create_connection(("localhost", 7788))
+		# wait for the shell to finish. sleep a bit longer in case it exits.
+		assert == "done"
+		time.sleep(0.1)
+		# now the shell has finished, we can write on the tcp socket
+		c.sendall(dat2)
+		c.shutdown(socket.SHUT_WR)
+		q.put(dat1)
+		# return a tcp response
+		q.put(None)
+		# check hasn't exited
+		assert r.poll() == None
+		# read the response
+		assert readall_socket(c) == dat1
+		c.close()
+		assert tcp.inbound() == dat2
+		# check has exited, allow time for dbclient to exit
+		time.sleep(0.1)
+		assert r.poll() == 0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/	Mon Oct 18 14:22:37 2021 +0800
@@ -0,0 +1,109 @@
+import subprocess
+import os
+import pty
+import tempfile
+import logging
+import time
+import socketserver
+import threading
+import queue
+import pytest
+def dropbear(request):
+	opt = request.config.option
+	if opt.remote:
+		yield None
+		return
+	args = [opt.dropbear,
+		"-p", LOCALADDR, # bind locally only
+		"-r", opt.hostkey,
+		"-p", opt.port,
+		"-F", "-E",
+		]
+	p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
+	# Wait until it has started listening
+	for l in p.stderr:
+		if "Not backgrounding" in l:
+			break
+	# Check it's still running
+		assert p.poll() is None
+	# Ready
+	yield p
+	p.terminate()
+def dbclient(request, *args, **kwargs):
+	opt = request.config.option
+	host = opt.remote or LOCALADDR
+	base_args = [opt.dbclient, "-y", host, "-p", opt.port]
+	if opt.user:
+		full_args.extend(['-l', opt.user])
+	full_args = base_args + list(args)
+	bg = kwargs.get("background")
+	if "background" in kwargs:
+		del kwargs["background"]
+	if bg:
+		return subprocess.Popen(full_args, **kwargs)
+	else:
+		# wait for response
+		return, **kwargs)
+class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
+	""" Listens for a single incoming request, sends a response if given,
+	and returns the inbound data.
+	Reponse can be a queue object, in which case each item in the queue will
+	be sent as a response, until it receives a None item.
+	"""
+	def __init__(self, port, timeout, response=None):
+		super().__init__(('localhost', port), self.Handler)
+		self.port = 	port
+		self.timeout = timeout
+		self.response = response
+		self.sink = None
+	class Handler(socketserver.StreamRequestHandler):
+		def handle(self):
+			if isinstance(self.server.response, queue.Queue):
+				while True:
+					i = self.server.response.get()
+					if i is None:
+						break
+					self.wfile.write(i)
+			elif self.server.response:
+				self.wfile.write(self.server.response)
+			assert self.server.sink is None, ">1 request sent to handler"
+			self.server.sink =
+	def __enter__(self):
+		self.server_thread = threading.Thread(target=self.serve_forever)
+		self.server_thread.daemon = True
+		self.server_thread.start()
+		return self
+	def __exit__(self, *exc_stuff):
+		self.shutdown()
+		self.server_thread.join()
+	def inbound(self):
+		""" Returns the data sent to the socket """
+		return self.sink
+def readall_socket(sock):
+	b = []
+	while True:
+		i = sock.recv(4096)
+		if not i:
+			break
+		b.append(i)
+	return b''.join(b)
+# returns a str
+def random_alnum(size):
+	r = os.urandom(500 + size*5)
+	return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()