Mercurial > dropbear
comparison test/test_channels.py @ 1836:06c7ddbb9dd6
Add first channel tests
These initial tests are checking various edge cases of channel handling
that have cropped up over the years.
author | Matt Johnston <matt@codeconstruct.com.au> |
---|---|
date | Mon, 18 Oct 2021 14:22:37 +0800 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
1835:90ac15aeac43 | 1836:06c7ddbb9dd6 |
---|---|
1 from test_dropbear import * | |
2 import signal | |
3 import queue | |
4 import socket | |
5 | |
6 # Tests for various edge cases of SSH channels and connection service | |
7 | |
8 def test_exitcode(request, dropbear): | |
9 r = dbclient(request, "exit 44") | |
10 assert r.returncode == 44 | |
11 | |
12 @pytest.mark.xfail(reason="Not yet implemented", strict=True) | |
13 def test_signal(request, dropbear): | |
14 r = dbclient(request, "kill -FPE $$") | |
15 assert r.returncode == -signal.SIGFPE | |
16 | |
17 @pytest.mark.parametrize("size", [0, 1, 2, 100, 5000, 200_000]) | |
18 def test_roundtrip(request, dropbear, size): | |
19 dat = os.urandom(size) | |
20 r = dbclient(request, "cat", input=dat, capture_output=True) | |
21 r.check_returncode() | |
22 assert r.stdout == dat | |
23 | |
24 @pytest.mark.parametrize("size", [0, 1, 2, 100, 20001, 41234]) | |
25 def test_read_pty(request, dropbear, size): | |
26 # testcase for | |
27 # https://bugs.openwrt.org/index.php?do=details&task_id=1814 | |
28 # https://github.com/mkj/dropbear/pull/85 | |
29 # From Yousong Zhou | |
30 # Fixed Oct 2021 | |
31 # | |
32 #$ ssh -t my.router cat /tmp/bigfile | wc | |
33 #Connection to my.router closed. | |
34 # 0 1 14335 <- should be 20001 | |
35 | |
36 # Write the file. No newlines etc which could confuse ptys | |
37 dat = random_alnum(size) | |
38 r = dbclient(request, "tmpf=`mktemp`; echo $tmpf; cat > $tmpf", input=dat, capture_output=True, text=True) | |
39 tmpf = r.stdout.rstrip() | |
40 r.check_returncode() | |
41 # Read with a pty, this is what is being tested. | |
42 # Timing/buffering is subtle, we seem to need to cat a file from disk to hit it. | |
43 m, s = pty.openpty() | |
44 r = dbclient(request, "-t", f"cat {tmpf}; rm {tmpf}", stdin=s, capture_output=True) | |
45 r.check_returncode() | |
46 assert r.stdout.decode() == dat | |
47 | |
48 @pytest.mark.parametrize("fd", [1, 2]) | |
49 def test_bg_sleep(request, fd, dropbear): | |
50 # https://lists.ucc.asn.au/pipermail/dropbear/2006q1/000362.html | |
51 # Rob Landley "Is this a bug?" 24 Mar 2006 | |
52 # dbclient user@system "sleep 10& echo hello" | |
53 # | |
54 # It should return right after printing hello, but it doesn't. It waits until | |
55 # the child process exits. | |
56 | |
57 # failure is TimeoutExpired | |
58 redir = "" if fd == 1 else " >&2 " | |
59 r = dbclient(request, f"sleep 10& echo hello {redir}", | |
60 capture_output=True, timeout=2, text=True) | |
61 r.check_returncode() | |
62 st = r.stdout if fd == 1 else r.stderr | |
63 | |
64 if fd == 2 and 'accepted unconditionally' in st: | |
65 # ignore hostkey warning, a bit of a hack | |
66 assert st.endswith("\n\nhello\n") | |
67 else: | |
68 assert st.rstrip() == "hello" | |
69 | |
70 | |
71 def test_idle(request, dropbear): | |
72 # Idle test, -I 1 should make it return before the 2 second timeout | |
73 r = dbclient(request, "-I", "1", "echo zong; sleep 10", | |
74 capture_output=True, timeout=2, text=True) | |
75 r.check_returncode() | |
76 assert r.stdout.rstrip() == "zong" | |
77 | |
78 @pytest.mark.parametrize("size", [1, 4000, 40000]) | |
79 def test_netcat(request, dropbear, size): | |
80 opt = request.config.option | |
81 if opt.remote: | |
82 pytest.xfail("don't know netcat address for remote") | |
83 | |
84 dat1 = os.urandom(size) | |
85 dat2 = os.urandom(size) | |
86 with HandleTcp(3344, 1, dat2) as tcp: | |
87 r = dbclient(request, "-B", "localhost:3344", input=dat1, capture_output=True) | |
88 r.check_returncode() | |
89 assert r.stdout == dat2 | |
90 assert tcp.inbound() == dat1 | |
91 | |
92 @pytest.mark.parametrize("size", [1, 4000, 40000]) | |
93 @pytest.mark.parametrize("fwd_flag", "LR") | |
94 def test_tcpflushout(request, dropbear, size, fwd_flag): | |
95 """ Tests that an opened TCP connection prevent a SSH session from being closed | |
96 until that TCP connection has finished transferring | |
97 """ | |
98 opt = request.config.option | |
99 if opt.remote: | |
100 pytest.xfail("don't know address for remote") | |
101 | |
102 dat1 = os.urandom(size) | |
103 dat2 = os.urandom(size) | |
104 q = queue.Queue() | |
105 with HandleTcp(3344, timeout=1, response=q) as tcp: | |
106 | |
107 r = dbclient(request, f"-{fwd_flag}", "7788:localhost:3344", "sleep 0.1; echo -n done", | |
108 text=True, background=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) | |
109 # time to let the listener start | |
110 time.sleep(0.1) | |
111 # open a tcp connection | |
112 c = socket.create_connection(("localhost", 7788)) | |
113 | |
114 # wait for the shell to finish. sleep a bit longer in case it exits. | |
115 assert r.stdout.read(4) == "done" | |
116 time.sleep(0.1) | |
117 | |
118 # now the shell has finished, we can write on the tcp socket | |
119 c.sendall(dat2) | |
120 c.shutdown(socket.SHUT_WR) | |
121 q.put(dat1) | |
122 | |
123 # return a tcp response | |
124 q.put(None) | |
125 # check hasn't exited | |
126 assert r.poll() == None | |
127 | |
128 # read the response | |
129 assert readall_socket(c) == dat1 | |
130 c.close() | |
131 assert tcp.inbound() == dat2 | |
132 # check has exited, allow time for dbclient to exit | |
133 time.sleep(0.1) | |
134 assert r.poll() == 0 |