view test/test_channels.py @ 1920:1489449eceb1

Check authorized_keys permissions as the user This is necessary on NFS with squash root. Based on work from Chris Dragan This commit also tidies some trailing whitespace. Fixes github pull #107
author Matt Johnston <matt@ucc.asn.au>
date Wed, 30 Mar 2022 12:56:09 +0800
parents 06c7ddbb9dd6
children
line wrap: on
line source

from test_dropbear import *
import signal
import queue
import socket

# Tests for various edge cases of SSH channels and connection service

def test_exitcode(request, dropbear):
	r = dbclient(request, "exit 44")
	assert r.returncode == 44

@pytest.mark.xfail(reason="Not yet implemented", strict=True)
def test_signal(request, dropbear):
	r = dbclient(request, "kill -FPE $$")
	assert r.returncode == -signal.SIGFPE

@pytest.mark.parametrize("size", [0, 1, 2, 100, 5000, 200_000])
def test_roundtrip(request, dropbear, size):
	dat = os.urandom(size)
	r = dbclient(request, "cat", input=dat, capture_output=True)
	r.check_returncode()
	assert r.stdout == dat

@pytest.mark.parametrize("size", [0, 1, 2, 100, 20001, 41234])
def test_read_pty(request, dropbear, size):
	# testcase for
	# https://bugs.openwrt.org/index.php?do=details&task_id=1814
	# https://github.com/mkj/dropbear/pull/85
	# From Yousong Zhou
	# Fixed Oct 2021
	#
	#$ ssh -t my.router cat /tmp/bigfile | wc
	#Connection to my.router closed.
	#  0       1   14335 <- should be 20001

	# Write the file. No newlines etc which could confuse ptys
	dat = random_alnum(size)
	r = dbclient(request, "tmpf=`mktemp`; echo $tmpf; cat > $tmpf", input=dat, capture_output=True, text=True)
	tmpf = r.stdout.rstrip()
	r.check_returncode()
	# Read with a pty, this is what is being tested.
	# Timing/buffering is subtle, we seem to need to cat a file from disk to hit it.
	m, s = pty.openpty()
	r = dbclient(request, "-t", f"cat {tmpf}; rm {tmpf}", stdin=s, capture_output=True)
	r.check_returncode()
	assert r.stdout.decode() == dat

@pytest.mark.parametrize("fd", [1, 2])
def test_bg_sleep(request, fd, dropbear):
	# https://lists.ucc.asn.au/pipermail/dropbear/2006q1/000362.html
	# Rob Landley "Is this a bug?" 24 Mar 2006
	# dbclient user@system "sleep 10& echo hello"
	#
	# It should return right after printing hello, but it doesn't.  It waits until
	# the child process exits.

	# failure is TimeoutExpired
	redir = "" if fd == 1 else " >&2 "
	r = dbclient(request, f"sleep 10& echo hello {redir}",
		capture_output=True, timeout=2, text=True)
	r.check_returncode()
	st = r.stdout if fd == 1 else r.stderr

	if fd == 2 and 'accepted unconditionally' in st:
		# ignore hostkey warning, a bit of a hack
		assert st.endswith("\n\nhello\n")
	else:
		assert st.rstrip() == "hello"


def test_idle(request, dropbear):
	# Idle test, -I 1 should make it return before the 2 second timeout
	r = dbclient(request, "-I", "1", "echo zong; sleep 10",
		capture_output=True, timeout=2, text=True)
	r.check_returncode()
	assert r.stdout.rstrip() == "zong"

@pytest.mark.parametrize("size", [1, 4000, 40000])
def test_netcat(request, dropbear, size):
	opt = request.config.option
	if opt.remote:
		pytest.xfail("don't know netcat address for remote")

	dat1 = os.urandom(size)
	dat2 = os.urandom(size)
	with HandleTcp(3344, 1, dat2) as tcp:
		r = dbclient(request, "-B", "localhost:3344", input=dat1, capture_output=True)
		r.check_returncode()
		assert r.stdout == dat2
		assert tcp.inbound() == dat1

@pytest.mark.parametrize("size", [1, 4000, 40000])
@pytest.mark.parametrize("fwd_flag", "LR")
def test_tcpflushout(request, dropbear, size, fwd_flag):
	""" Tests that an opened TCP connection prevent a SSH session from being closed
	until that TCP connection has finished transferring
	"""
	opt = request.config.option
	if opt.remote:
		pytest.xfail("don't know address for remote")

	dat1 = os.urandom(size)
	dat2 = os.urandom(size)
	q = queue.Queue()
	with HandleTcp(3344, timeout=1, response=q) as tcp:

		r = dbclient(request, f"-{fwd_flag}", "7788:localhost:3344", "sleep 0.1; echo -n done",
			text=True, background=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
		# time to let the listener start
		time.sleep(0.1)
		# open a tcp connection
		c = socket.create_connection(("localhost", 7788))

		# wait for the shell to finish. sleep a bit longer in case it exits.
		assert r.stdout.read(4) == "done"
		time.sleep(0.1)

		# now the shell has finished, we can write on the tcp socket
		c.sendall(dat2)
		c.shutdown(socket.SHUT_WR)
		q.put(dat1)

		# return a tcp response
		q.put(None)
		# check hasn't exited
		assert r.poll() == None

		# read the response
		assert readall_socket(c) == dat1
		c.close()
		assert tcp.inbound() == dat2
		# check has exited, allow time for dbclient to exit
		time.sleep(0.1)
		assert r.poll() == 0