view test/ @ 1930:299f4f19ba19

Add /usr/sbin and /sbin to default root PATH When dropbear is used in a very restricted environment (such as in a initrd), the default user shell is often also very restricted and doesn't take care of setting the PATH so the user ends up with the PATH set by dropbear. Unfortunately, dropbear always sets "/usr/bin:/bin" as default PATH even for the root user which should have /usr/sbin and /sbin too. For a concrete instance of this problem, see the "Remote Unlocking" section in this tutorial: It speaks of a bug in the initramfs script because it's written "blkid" instead of "/sbin/blkid"... this is just because the scripts from the initramfs do not expect to have a PATH without the sbin directories and because dropbear is not setting the PATH appropriately for the root user. I'm thus suggesting to use the attached patch to fix this misbehaviour (I did not test it, but it's easy enough). It might seem anecdotic but multiple Kali users have been bitten by this. From
author Raphael Hertzog <>
date Mon, 09 Jul 2018 16:27:53 +0200
parents 81991ded740f
line wrap: on
line source

import subprocess
import os
import pty
import tempfile
import logging
import time
import socketserver
import threading
import queue

import pytest


def dropbear(request):
	opt = request.config.option
	if opt.remote:
		yield None

	# split so that "dropbearmulti dropbear" works
	args = opt.dropbear.split() + [
		"-p", LOCALADDR, # bind locally only
		"-r", opt.hostkey,
		"-p", opt.port,
		"-F", "-E",
	p = subprocess.Popen(args, stderr=subprocess.PIPE, text=True)
	# Wait until it has started listening
	for l in p.stderr:
		if "Not backgrounding" in l:
	# Check it's still running
		assert p.poll() is None
	# Ready
	yield p
	print("Terminated dropbear. Flushing output:")
	for l in p.stderr:

def dbclient(request, *args, **kwargs):
	opt = request.config.option
	host = opt.remote or LOCALADDR
	# split so that "dropbearmulti dbclient" works
	base_args = opt.dbclient.split() + ["-y", host, "-p", opt.port]
	if opt.user:
		base_args.extend(['-l', opt.user])
	full_args = base_args + list(args)
	bg = kwargs.get("background")
	if "background" in kwargs:
		del kwargs["background"]
	if bg:
		return subprocess.Popen(full_args, **kwargs)
		kwargs.setdefault("timeout", 10)
		# wait for response
		return, **kwargs)

def own_venv_command():
	""" Returns a command to run as a prefix to get the same venv
	as the current running Python. Returns '' on not a virtualenv
		venv = os.environ['VIRTUAL_ENV']
	except KeyError:
		return ""

	# note: bash/zsh unix specific
	return f"source {venv}/bin/activate"

class HandleTcp(socketserver.ThreadingMixIn, socketserver.TCPServer):

	# override TCPServer's default, avoids TIME_WAIT
	allow_reuse_addr = True

	""" Listens for a single incoming request, sends a response if given,
	and returns the inbound data.
	Reponse can be a queue object, in which case each item in the queue will
	be sent as a response, until it receives a None item.
	def __init__(self, port, timeout, response=None):
		super().__init__(('localhost', port), self.Handler)
		self.port = 	port
		self.timeout = timeout
		self.response = response
		self.sink = None

	class Handler(socketserver.StreamRequestHandler):
		def handle(self):
			if isinstance(self.server.response, queue.Queue):
				while True:
					i = self.server.response.get()
					if i is None:
			elif self.server.response:
			assert self.server.sink is None, ">1 request sent to handler"
			self.server.sink =

	def __enter__(self):
		self.server_thread = threading.Thread(target=self.serve_forever)
		self.server_thread.daemon = True
		return self

	def __exit__(self, *exc_stuff):

	def inbound(self):
		""" Returns the data sent to the socket """
		return self.sink

def readall_socket(sock):
	b = []
	while True:
		i = sock.recv(4096)
		if not i:
	return b''.join(b)

# returns a str
def random_alnum(size):
	r = os.urandom(500 + size*5)
	return bytes(i for i in r if bytes((i,)).isalnum())[:size].decode()