view test/ @ 1879:81991ded740f

test: Set allow_reuse_addr This avoids intermittent "Address already in use" failure
author Matt Johnston <>
date Tue, 15 Mar 2022 15:38:46 +0800
parents 0dcc5b0d93fa
line wrap: on
line source
import subprocess
import os
import pty
import tempfile
import logging
import time
import socketserver
import threading
import queue

import pytest


def dropbear(request):
	opt = request.config.option
	if opt.remote:
		yield None

	# split so that "dropbearmulti dropbear" works
	args = opt.dropbear.split() + [
		"-p", LOCALADDR, # bind locally only
		"-r", opt.hostkey,
		"-p", opt.port,
		"-F", "-E",
	p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
	# Wait until it has started listening
	for l in p.stderr:
		if "Not backgrounding" in l:
	# Check it's still running
		assert p.poll() is None
	# Ready
	yield p
	print("Terminated dropbear. Flushing output:")
	for l in p.stderr:

def dbclient(request, *args, **kwargs):
	opt = request.config.option
	host = opt.remote or LOCALADDR
	# split so that "dropbearmulti dbclient" works
	base_args = opt.dbclient.split() + ["-y", host, "-p", opt.port]
	if opt.user:
		base_args.extend(['-l', opt.user])
	full_args = base_args + list(args)
	bg = kwargs.get("background")
	if "background" in kwargs:
		del kwargs["background"]
	if bg:
		return subprocess.Popen(full_args, **kwargs)
		kwargs.setdefault("timeout", 10)
		# wait for response
		return, **kwargs)

def own_venv_command():
	""" Returns a command to run as a prefix to get the same venv
	as the current running Python. Returns '' on not a virtualenv
		venv = os.environ['VIRTUAL_ENV']
	except KeyError:
		return ""

	# note: bash/zsh unix specific
	return f"source {venv}/bin/activate"

class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):

	# override TCPServer's default, avoids TIME_WAIT
	allow_reuse_addr = True

	""" Listens for a single incoming request, sends a response if given,
	and returns the inbound data.
	Reponse can be a queue object, in which case each item in the queue will
	be sent as a response, until it receives a None item.
	def __init__(self, port, timeout, response=None):
		super().__init__(('localhost', port), self.Handler)
		self.port = 	port
		self.timeout = timeout
		self.response = response
		self.sink = None

	class Handler(socketserver.StreamRequestHandler):
		def handle(self):
			if isinstance(self.server.response, queue.Queue):
				while True:
					i = self.server.response.get()
					if i is None:
			elif self.server.response:
			assert self.server.sink is None, ">1 request sent to handler"
			self.server.sink =

	def __enter__(self):
		self.server_thread = threading.Thread(target=self.serve_forever)
		self.server_thread.daemon = True
		return self

	def __exit__(self, *exc_stuff):

	def inbound(self):
		""" Returns the data sent to the socket """
		return self.sink

def readall_socket(sock):
	b = []
	while True:
		i = sock.recv(4096)
		if not i:
	return b''.join(b)

# returns a str
def random_alnum(size):
	r = os.urandom(500 + size*5)
	return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()