comparison test/test_dropbear.py @ 1836:06c7ddbb9dd6

Add first channel tests These initial tests are checking various edge cases of channel handling that have cropped up over the years.
author Matt Johnston <matt@codeconstruct.com.au>
date Mon, 18 Oct 2021 14:22:37 +0800
parents
children d757f48ae29f
comparison
equal deleted inserted replaced
1835:90ac15aeac43 1836:06c7ddbb9dd6
1 import subprocess
2 import os
3 import pty
4 import tempfile
5 import logging
6 import time
7 import socketserver
8 import threading
9 import queue
10
11 import pytest
12
13 LOCALADDR="127.0.5.5"
14
15 @pytest.fixture(scope="module")
16 def dropbear(request):
17 opt = request.config.option
18 if opt.remote:
19 yield None
20 return
21
22 args = [opt.dropbear,
23 "-p", LOCALADDR, # bind locally only
24 "-r", opt.hostkey,
25 "-p", opt.port,
26 "-F", "-E",
27 ]
28 p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
29 # Wait until it has started listening
30 for l in p.stderr:
31 if "Not backgrounding" in l:
32 break
33 # Check it's still running
34 assert p.poll() is None
35 # Ready
36 yield p
37 p.terminate()
38
39 def dbclient(request, *args, **kwargs):
40 opt = request.config.option
41 host = opt.remote or LOCALADDR
42 base_args = [opt.dbclient, "-y", host, "-p", opt.port]
43 if opt.user:
44 full_args.extend(['-l', opt.user])
45 full_args = base_args + list(args)
46 bg = kwargs.get("background")
47 if "background" in kwargs:
48 del kwargs["background"]
49 if bg:
50 return subprocess.Popen(full_args, **kwargs)
51 else:
52 # wait for response
53 return subprocess.run(full_args, **kwargs)
54
55 class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):
56 """ Listens for a single incoming request, sends a response if given,
57 and returns the inbound data.
58 Reponse can be a queue object, in which case each item in the queue will
59 be sent as a response, until it receives a None item.
60 """
61 def __init__(self, port, timeout, response=None):
62 super().__init__(('localhost', port), self.Handler)
63 self.port = port
64 self.timeout = timeout
65 self.response = response
66 self.sink = None
67
68 class Handler(socketserver.StreamRequestHandler):
69 def handle(self):
70 if isinstance(self.server.response, queue.Queue):
71 while True:
72 i = self.server.response.get()
73 if i is None:
74 break
75 self.wfile.write(i)
76 elif self.server.response:
77 self.wfile.write(self.server.response)
78 assert self.server.sink is None, ">1 request sent to handler"
79 self.server.sink = self.rfile.read()
80
81 def __enter__(self):
82 self.server_thread = threading.Thread(target=self.serve_forever)
83 self.server_thread.daemon = True
84 self.server_thread.start()
85 return self
86
87 def __exit__(self, *exc_stuff):
88 self.shutdown()
89 self.server_thread.join()
90
91 def inbound(self):
92 """ Returns the data sent to the socket """
93 return self.sink
94
95 def readall_socket(sock):
96 b = []
97 while True:
98 i = sock.recv(4096)
99 if not i:
100 break
101 b.append(i)
102 return b''.join(b)
103
104 # returns a str
105 def random_alnum(size):
106 r = os.urandom(500 + size*5)
107 return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()
108
109