Merge pull request #71684 from tfc/integration-test-python

nixos: add python testing support
This commit is contained in:
Florian Klink 2019-11-05 00:06:00 +01:00 committed by GitHub
commit 22906321fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 1348 additions and 282 deletions

View file

@ -14,14 +14,14 @@
starting VDE switch for network 1
<prompt>&gt;</prompt>
</screen>
You can then take any Perl statement, e.g.
You can then take any Python statement, e.g.
<screen>
<prompt>&gt;</prompt> startAll
<prompt>&gt;</prompt> testScript
<prompt>&gt;</prompt> $machine->succeed("touch /tmp/foo")
<prompt>&gt;</prompt> print($machine->succeed("pwd")) # Show stdout of command
<prompt>&gt;</prompt> start_all()
<prompt>&gt;</prompt> test_script()
<prompt>&gt;</prompt> machine.succeed("touch /tmp/foo")
<prompt>&gt;</prompt> print(machine.succeed("pwd")) # Show stdout of command
</screen>
The function <command>testScript</command> executes the entire test script
The function <command>test_script</command> executes the entire test script
and drops you back into the test driver command line upon its completion.
This allows you to inspect the state of the VMs after the test (e.g. to debug
the test script).

View file

@ -8,7 +8,7 @@
<para>
A NixOS test is a Nix expression that has the following structure:
<programlisting>
import ./make-test.nix {
import ./make-test-python.nix {
# Either the configuration of a single machine:
machine =
@ -27,11 +27,11 @@ import ./make-test.nix {
testScript =
''
<replaceable>Perl code…</replaceable>
<replaceable>Python code…</replaceable>
'';
}
</programlisting>
The attribute <literal>testScript</literal> is a bit of Perl code that
The attribute <literal>testScript</literal> is a bit of Python code that
executes the test (described below). During the test, it will start one or
more virtual machines, the configuration of which is described by the
attribute <literal>machine</literal> (if you need only one machine in your
@ -96,26 +96,27 @@ xlink:href="https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/virtualis
</para>
<para>
The test script is a sequence of Perl statements that perform various
The test script is a sequence of Python statements that perform various
actions, such as starting VMs, executing commands in the VMs, and so on. Each
virtual machine is represented as an object stored in the variable
<literal>$<replaceable>name</replaceable></literal>, where
<replaceable>name</replaceable> is the identifier of the machine (which is
just <literal>machine</literal> if you didnt specify multiple machines
using the <literal>nodes</literal> attribute). For instance, the following
starts the machine, waits until it has finished booting, then executes a
command and checks that the output is more-or-less correct:
<literal><replaceable>name</replaceable></literal> if this is also the
identifier of the machine in the declarative config.
If you didn't specify multiple machines using the <literal>nodes</literal>
attribute, it is just <literal>machine</literal>.
The following example starts the machine, waits until it has finished booting,
then executes a command and checks that the output is more-or-less correct:
<programlisting>
$machine->start;
$machine->waitForUnit("default.target");
$machine->succeed("uname") =~ /Linux/ or die;
machine.start()
machine.wait_for_unit("default.target")
if not "Linux" in machine.succeed("uname"):
raise Exception("Wrong OS")
</programlisting>
The first line is actually unnecessary; machines are implicitly started when
you first execute an action on them (such as <literal>waitForUnit</literal>
you first execute an action on them (such as <literal>wait_for_unit</literal>
or <literal>succeed</literal>). If you have multiple machines, you can speed
up the test by starting them in parallel:
<programlisting>
startAll;
start_all()
</programlisting>
</para>
@ -187,7 +188,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>getScreenText</methodname>
<methodname>get_screen_text</methodname>
</term>
<listitem>
<para>
@ -204,7 +205,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>sendMonitorCommand</methodname>
<methodname>send_monitor_command</methodname>
</term>
<listitem>
<para>
@ -215,23 +216,23 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>sendKeys</methodname>
<methodname>send_keys</methodname>
</term>
<listitem>
<para>
Simulate pressing keys on the virtual keyboard, e.g.,
<literal>sendKeys("ctrl-alt-delete")</literal>.
<literal>send_keys("ctrl-alt-delete")</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<methodname>sendChars</methodname>
<methodname>send_chars</methodname>
</term>
<listitem>
<para>
Simulate typing a sequence of characters on the virtual keyboard, e.g.,
<literal>sendKeys("foobar\n")</literal> will type the string
<literal>send_keys("foobar\n")</literal> will type the string
<literal>foobar</literal> followed by the Enter key.
</para>
</listitem>
@ -272,7 +273,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitUntilSucceeds</methodname>
<methodname>wait_until_succeeds</methodname>
</term>
<listitem>
<para>
@ -282,7 +283,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitUntilFails</methodname>
<methodname>wait_until_fails</methodname>
</term>
<listitem>
<para>
@ -292,7 +293,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForUnit</methodname>
<methodname>wait_for_unit</methodname>
</term>
<listitem>
<para>
@ -302,7 +303,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForFile</methodname>
<methodname>wait_for_file</methodname>
</term>
<listitem>
<para>
@ -312,7 +313,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForOpenPort</methodname>
<methodname>wait_for_open_port</methodname>
</term>
<listitem>
<para>
@ -323,7 +324,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForClosedPort</methodname>
<methodname>wait_for_closed_port</methodname>
</term>
<listitem>
<para>
@ -333,7 +334,7 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForX</methodname>
<methodname>wait_for_x</methodname>
</term>
<listitem>
<para>
@ -343,13 +344,13 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForText</methodname>
<methodname>wait_for_text</methodname>
</term>
<listitem>
<para>
Wait until the supplied regular expressions matches the textual contents
of the screen by using optical character recognition (see
<methodname>getScreenText</methodname>).
<methodname>get_screen_text</methodname>).
</para>
<note>
<para>
@ -361,23 +362,23 @@ startAll;
</varlistentry>
<varlistentry>
<term>
<methodname>waitForWindow</methodname>
<methodname>wait_for_window</methodname>
</term>
<listitem>
<para>
Wait until an X11 window has appeared whose name matches the given
regular expression, e.g., <literal>waitForWindow(qr/Terminal/)</literal>.
regular expression, e.g., <literal>wait_for_window("Terminal")</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
<methodname>copyFileFromHost</methodname>
<methodname>copy_file_from_host</methodname>
</term>
<listitem>
<para>
Copies a file from host to machine, e.g.,
<literal>copyFileFromHost("myfile", "/etc/my/important/file")</literal>.
<literal>copy_file_from_host("myfile", "/etc/my/important/file")</literal>.
</para>
<para>
The first argument is the file on the host. The file needs to be
@ -397,8 +398,8 @@ startAll;
</para>
<para>
<programlisting>
$machine->systemctl("list-jobs --no-pager"); // runs `systemctl list-jobs --no-pager`
$machine->systemctl("list-jobs --no-pager", "any-user"); // spawns a shell for `any-user` and runs `systemctl --user list-jobs --no-pager`
machine.systemctl("list-jobs --no-pager") # runs `systemctl list-jobs --no-pager`
machine.systemctl("list-jobs --no-pager", "any-user") # spawns a shell for `any-user` and runs `systemctl --user list-jobs --no-pager`
</programlisting>
</para>
</listitem>
@ -408,14 +409,14 @@ $machine->systemctl("list-jobs --no-pager", "any-user"); // spawns a shell for `
<para>
To test user units declared by <literal>systemd.user.services</literal> the
optional <literal>$user</literal> argument can be used:
optional <literal>user</literal> argument can be used:
<programlisting>
$machine->start;
$machine->waitForX;
$machine->waitForUnit("xautolock.service", "x-session-user");
machine.start()
machine.wait_for_x()
machine.wait_for_unit("xautolock.service", "x-session-user")
</programlisting>
This applies to <literal>systemctl</literal>, <literal>getUnitInfo</literal>,
<literal>waitForUnit</literal>, <literal>startJob</literal> and
<literal>stopJob</literal>.
This applies to <literal>systemctl</literal>, <literal>get_unit_info</literal>,
<literal>wait_for_unit</literal>, <literal>start_job</literal> and
<literal>stop_job</literal>.
</para>
</section>

View file

@ -0,0 +1,758 @@
#! /somewhere/python3
from contextlib import contextmanager
from xml.sax.saxutils import XMLGenerator
import _thread
import atexit
import os
import pty
import queue
import re
import shutil
import socket
import subprocess
import sys
import tempfile
import time
import unicodedata
import ptpython.repl
CHAR_TO_KEY = {
"A": "shift-a",
"N": "shift-n",
"-": "0x0C",
"_": "shift-0x0C",
"B": "shift-b",
"O": "shift-o",
"=": "0x0D",
"+": "shift-0x0D",
"C": "shift-c",
"P": "shift-p",
"[": "0x1A",
"{": "shift-0x1A",
"D": "shift-d",
"Q": "shift-q",
"]": "0x1B",
"}": "shift-0x1B",
"E": "shift-e",
"R": "shift-r",
";": "0x27",
":": "shift-0x27",
"F": "shift-f",
"S": "shift-s",
"'": "0x28",
'"': "shift-0x28",
"G": "shift-g",
"T": "shift-t",
"`": "0x29",
"~": "shift-0x29",
"H": "shift-h",
"U": "shift-u",
"\\": "0x2B",
"|": "shift-0x2B",
"I": "shift-i",
"V": "shift-v",
",": "0x33",
"<": "shift-0x33",
"J": "shift-j",
"W": "shift-w",
".": "0x34",
">": "shift-0x34",
"K": "shift-k",
"X": "shift-x",
"/": "0x35",
"?": "shift-0x35",
"L": "shift-l",
"Y": "shift-y",
" ": "spc",
"M": "shift-m",
"Z": "shift-z",
"\n": "ret",
"!": "shift-0x02",
"@": "shift-0x03",
"#": "shift-0x04",
"$": "shift-0x05",
"%": "shift-0x06",
"^": "shift-0x07",
"&": "shift-0x08",
"*": "shift-0x09",
"(": "shift-0x0A",
")": "shift-0x0B",
}
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def create_vlan(vlan_nr):
global log
log.log("starting VDE switch for network {}".format(vlan_nr))
vde_socket = os.path.abspath("./vde{}.ctl".format(vlan_nr))
pty_master, pty_slave = pty.openpty()
vde_process = subprocess.Popen(
["vde_switch", "-s", vde_socket, "--dirmode", "0777"],
bufsize=1,
stdin=pty_slave,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
fd = os.fdopen(pty_master, "w")
fd.write("version\n")
# TODO: perl version checks if this can be read from
# an if not, dies. we could hang here forever. Fix it.
vde_process.stdout.readline()
if not os.path.exists(os.path.join(vde_socket, "ctl")):
raise Exception("cannot start vde_switch")
return (vlan_nr, vde_socket, vde_process, fd)
def retry(fn):
"""Call the given function repeatedly, with 1 second intervals,
until it returns True or a timeout is reached.
"""
for _ in range(900):
if fn(False):
return
time.sleep(1)
if not fn(True):
raise Exception("action timed out")
class Logger:
def __init__(self):
self.logfile = os.environ.get("LOGFILE", "/dev/null")
self.logfile_handle = open(self.logfile, "wb")
self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
self.queue = queue.Queue(1000)
self.xml.startDocument()
self.xml.startElement("logfile", attrs={})
def close(self):
self.xml.endElement("logfile")
self.xml.endDocument()
self.logfile_handle.close()
def sanitise(self, message):
return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
def maybe_prefix(self, message, attributes):
if "machine" in attributes:
return "{}: {}".format(attributes["machine"], message)
return message
def log_line(self, message, attributes):
self.xml.startElement("line", attributes)
self.xml.characters(message)
self.xml.endElement("line")
def log(self, message, attributes={}):
eprint(self.maybe_prefix(message, attributes))
self.drain_log_queue()
self.log_line(message, attributes)
def enqueue(self, message):
self.queue.put(message)
def drain_log_queue(self):
try:
while True:
item = self.queue.get_nowait()
attributes = {"machine": item["machine"], "type": "serial"}
self.log_line(self.sanitise(item["msg"]), attributes)
except queue.Empty:
pass
@contextmanager
def nested(self, message, attributes={}):
eprint(self.maybe_prefix(message, attributes))
self.xml.startElement("nest", attrs={})
self.xml.startElement("head", attributes)
self.xml.characters(message)
self.xml.endElement("head")
tic = time.time()
self.drain_log_queue()
yield
self.drain_log_queue()
toc = time.time()
self.log("({:.2f} seconds)".format(toc - tic))
self.xml.endElement("nest")
class Machine:
def __init__(self, args):
if "name" in args:
self.name = args["name"]
else:
self.name = "machine"
try:
cmd = args["startCommand"]
self.name = re.search("run-(.+)-vm$", cmd).group(1)
except KeyError:
pass
except AttributeError:
pass
self.script = args.get("startCommand", self.create_startcommand(args))
tmp_dir = os.environ.get("TMPDIR", tempfile.gettempdir())
def create_dir(name):
path = os.path.join(tmp_dir, name)
os.makedirs(path, mode=0o700, exist_ok=True)
return path
self.state_dir = create_dir("vm-state-{}".format(self.name))
self.shared_dir = create_dir("xchg-shared")
self.booted = False
self.connected = False
self.pid = None
self.socket = None
self.monitor = None
self.logger = args["log"]
self.allow_reboot = args.get("allowReboot", False)
@staticmethod
def create_startcommand(args):
net_backend = "-netdev user,id=net0"
net_frontend = "-device virtio-net-pci,netdev=net0"
if "netBackendArgs" in args:
net_backend += "," + args["netBackendArgs"]
if "netFrontendArgs" in args:
net_frontend += "," + args["netFrontendArgs"]
start_command = (
"qemu-kvm -m 384 " + net_backend + " " + net_frontend + " $QEMU_OPTS "
)
if "hda" in args:
hda_path = os.path.abspath(args["hda"])
if args.get("hdaInterface", "") == "scsi":
start_command += (
"-drive id=hda,file="
+ hda_path
+ ",werror=report,if=none "
+ "-device scsi-hd,drive=hda "
)
else:
start_command += (
"-drive file="
+ hda_path
+ ",if="
+ args["hdaInterface"]
+ ",werror=report "
)
if "cdrom" in args:
start_command += "-cdrom " + args["cdrom"] + " "
if "usb" in args:
start_command += (
"-device piix3-usb-uhci -drive "
+ "id=usbdisk,file="
+ args["usb"]
+ ",if=none,readonly "
+ "-device usb-storage,drive=usbdisk "
)
if "bios" in args:
start_command += "-bios " + args["bios"] + " "
start_command += args.get("qemuFlags", "")
return start_command
def is_up(self):
return self.booted and self.connected
def log(self, msg):
self.logger.log(msg, {"machine": self.name})
def nested(self, msg, attrs={}):
my_attrs = {"machine": self.name}
my_attrs.update(attrs)
return self.logger.nested(msg, my_attrs)
def wait_for_monitor_prompt(self):
while True:
answer = self.monitor.recv(1024).decode()
if answer.endswith("(qemu) "):
return answer
def send_monitor_command(self, command):
message = ("{}\n".format(command)).encode()
self.log("sending monitor command: {}".format(command))
self.monitor.send(message)
return self.wait_for_monitor_prompt()
def wait_for_unit(self, unit, user=None):
while True:
info = self.get_unit_info(unit, user)
state = info["ActiveState"]
if state == "failed":
raise Exception('unit "{}" reached state "{}"'.format(unit, state))
if state == "inactive":
status, jobs = self.systemctl("list-jobs --full 2>&1", user)
if "No jobs" in jobs:
info = self.get_unit_info(unit)
if info["ActiveState"] == state:
raise Exception(
(
'unit "{}" is inactive and there ' "are no pending jobs"
).format(unit)
)
if state == "active":
return True
def get_unit_info(self, unit, user=None):
status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user)
if status != 0:
return None
line_pattern = re.compile(r"^([^=]+)=(.*)$")
def tuple_from_line(line):
match = line_pattern.match(line)
return match[1], match[2]
return dict(
tuple_from_line(line)
for line in lines.split("\n")
if line_pattern.match(line)
)
def systemctl(self, q, user=None):
if user is not None:
q = q.replace("'", "\\'")
return self.execute(
(
"su -l {} -c "
"$'XDG_RUNTIME_DIR=/run/user/`id -u` "
"systemctl --user {}'"
).format(user, q)
)
return self.execute("systemctl {}".format(q))
def execute(self, command):
self.connect()
out_command = "( {} ); echo '|!EOF' $?\n".format(command)
self.shell.send(out_command.encode())
output = ""
status_code_pattern = re.compile(r"(.*)\|\!EOF\s+(\d+)")
while True:
chunk = self.shell.recv(4096).decode()
match = status_code_pattern.match(chunk)
if match:
output += match[1]
status_code = int(match[2])
return (status_code, output)
output += chunk
def succeed(self, *commands):
"""Execute each command and check that it succeeds."""
for command in commands:
with self.nested("must succeed: {}".format(command)):
status, output = self.execute(command)
if status != 0:
self.log("output: {}".format(output))
raise Exception(
"command `{}` failed (exit code {})".format(command, status)
)
return output
def fail(self, *commands):
"""Execute each command and check that it fails."""
for command in commands:
with self.nested("must fail: {}".format(command)):
status, output = self.execute(command)
if status == 0:
raise Exception(
"command `{}` unexpectedly succeeded".format(command)
)
def wait_until_succeeds(self, command):
with self.nested("waiting for success: {}".format(command)):
while True:
status, output = self.execute(command)
if status == 0:
return output
def wait_until_fails(self, command):
with self.nested("waiting for failure: {}".format(command)):
while True:
status, output = self.execute(command)
if status != 0:
return output
def wait_for_shutdown(self):
if not self.booted:
return
with self.nested("waiting for the VM to power off"):
sys.stdout.flush()
self.process.wait()
self.pid = None
self.booted = False
self.connected = False
def get_tty_text(self, tty):
status, output = self.execute(
"fold -w$(stty -F /dev/tty{0} size | "
"awk '{{print $2}}') /dev/vcs{0}".format(tty)
)
return output
def wait_until_tty_matches(self, tty, regexp):
matcher = re.compile(regexp)
with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
while True:
text = self.get_tty_text(tty)
if len(matcher.findall(text)) > 0:
return True
def send_chars(self, chars):
with self.nested("sending keys {}".format(chars)):
for char in chars:
self.send_key(char)
def wait_for_file(self, filename):
with self.nested("waiting for file {}".format(filename)):
while True:
status, _ = self.execute("test -e {}".format(filename))
if status == 0:
return True
def wait_for_open_port(self, port):
def port_is_open(_):
status, _ = self.execute("nc -z localhost {}".format(port))
return status == 0
with self.nested("waiting for TCP port {}".format(port)):
retry(port_is_open)
def wait_for_closed_port(self, port):
def port_is_closed(_):
status, _ = self.execute("nc -z localhost {}".format(port))
return status != 0
retry(port_is_closed)
def start_job(self, jobname, user=None):
return self.systemctl("start {}".format(jobname), user)
def stop_job(self, jobname, user=None):
return self.systemctl("stop {}".format(jobname), user)
def wait_for_job(self, jobname):
return self.wait_for_unit(jobname)
def connect(self):
if self.connected:
return
with self.nested("waiting for the VM to finish booting"):
self.start()
tic = time.time()
self.shell.recv(1024)
# TODO: Timeout
toc = time.time()
self.log("connected to guest root shell")
self.log("(connecting took {:.2f} seconds)".format(toc - tic))
self.connected = True
def screenshot(self, filename):
out_dir = os.environ.get("out", os.getcwd())
word_pattern = re.compile(r"^\w+$")
if word_pattern.match(filename):
filename = os.path.join(out_dir, "{}.png".format(filename))
tmp = "{}.ppm".format(filename)
with self.nested(
"making screenshot {}".format(filename),
{"image": os.path.basename(filename)},
):
self.send_monitor_command("screendump {}".format(tmp))
ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
os.unlink(tmp)
if ret.returncode != 0:
raise Exception("Cannot convert screenshot")
def get_screen_text(self):
if shutil.which("tesseract") is None:
raise Exception("get_screen_text used but enableOCR is false")
magick_args = (
"-filter Catrom -density 72 -resample 300 "
+ "-contrast -normalize -despeckle -type grayscale "
+ "-sharpen 1 -posterize 3 -negate -gamma 100 "
+ "-blur 1x65535"
)
tess_args = "-c debug_file=/dev/null --psm 11 --oem 2"
with self.nested("performing optical character recognition"):
with tempfile.NamedTemporaryFile() as tmpin:
self.send_monitor_command("screendump {}".format(tmpin.name))
cmd = "convert {} {} tiff:- | tesseract - - {}".format(
magick_args, tmpin.name, tess_args
)
ret = subprocess.run(cmd, shell=True, capture_output=True)
if ret.returncode != 0:
raise Exception(
"OCR failed with exit code {}".format(ret.returncode)
)
return ret.stdout.decode("utf-8")
def wait_for_text(self, regex):
def screen_matches(last):
text = self.get_screen_text()
m = re.search(regex, text)
if last and not m:
self.log("Last OCR attempt failed. Text was: {}".format(text))
return m
with self.nested("waiting for {} to appear on screen".format(regex)):
retry(screen_matches)
def send_key(self, key):
key = CHAR_TO_KEY.get(key, key)
self.send_monitor_command("sendkey {}".format(key))
def start(self):
if self.booted:
return
self.log("starting vm")
def create_socket(path):
if os.path.exists(path):
os.unlink(path)
s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
s.bind(path)
s.listen(1)
return s
monitor_path = os.path.join(self.state_dir, "monitor")
self.monitor_socket = create_socket(monitor_path)
shell_path = os.path.join(self.state_dir, "shell")
self.shell_socket = create_socket(shell_path)
qemu_options = (
" ".join(
[
"" if self.allow_reboot else "-no-reboot",
"-monitor unix:{}".format(monitor_path),
"-chardev socket,id=shell,path={}".format(shell_path),
"-device virtio-serial",
"-device virtconsole,chardev=shell",
"-device virtio-rng-pci",
"-serial stdio" if "DISPLAY" in os.environ else "-nographic",
]
)
+ " "
+ os.environ.get("QEMU_OPTS", "")
)
environment = {
"QEMU_OPTS": qemu_options,
"SHARED_DIR": self.shared_dir,
"USE_TMPDIR": "1",
}
environment.update(dict(os.environ))
self.process = subprocess.Popen(
self.script,
bufsize=1,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
cwd=self.state_dir,
env=environment,
)
self.monitor, _ = self.monitor_socket.accept()
self.shell, _ = self.shell_socket.accept()
def process_serial_output():
for line in self.process.stdout:
line = line.decode().replace("\r", "").rstrip()
eprint("{} # {}".format(self.name, line))
self.logger.enqueue({"msg": line, "machine": self.name})
_thread.start_new_thread(process_serial_output, ())
self.wait_for_monitor_prompt()
self.pid = self.process.pid
self.booted = True
self.log("QEMU running (pid {})".format(self.pid))
def shutdown(self):
if self.booted:
return
self.shell.send("poweroff\n".encode())
self.wait_for_shutdown()
def crash(self):
if self.booted:
return
self.log("forced crash")
self.send_monitor_command("quit")
self.wait_for_shutdown()
def wait_for_x(self):
"""Wait until it is possible to connect to the X server. Note that
testing the existence of /tmp/.X11-unix/X0 is insufficient.
"""
with self.nested("waiting for the X11 server"):
while True:
cmd = (
"journalctl -b SYSLOG_IDENTIFIER=systemd | "
+ 'grep "Reached target Current graphical"'
)
status, _ = self.execute(cmd)
if status != 0:
continue
status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
if status == 0:
return
def sleep(self, secs):
time.sleep(secs)
def block(self):
"""Make the machine unreachable by shutting down eth1 (the multicast
interface used to talk to the other VMs). We keep eth0 up so that
the test driver can continue to talk to the machine.
"""
self.send_monitor_command("set_link virtio-net-pci.1 off")
def unblock(self):
"""Make the machine reachable.
"""
self.send_monitor_command("set_link virtio-net-pci.1 on")
def create_machine(args):
global log
args["log"] = log
args["redirectSerial"] = os.environ.get("USE_SERIAL", "0") == "1"
return Machine(args)
def start_all():
with log.nested("starting all VMs"):
for machine in machines:
machine.start()
def join_all():
with log.nested("waiting for all VMs to finish"):
for machine in machines:
machine.wait_for_shutdown()
def test_script():
exec(os.environ["testScript"])
def run_tests():
tests = os.environ.get("tests", None)
if tests is not None:
with log.nested("running the VM test script"):
try:
exec(tests)
except Exception as e:
eprint("error: {}".format(str(e)))
sys.exit(1)
else:
ptpython.repl.embed(locals(), globals())
# TODO: Collect coverage data
for machine in machines:
if machine.is_up():
machine.execute("sync")
if nr_tests != 0:
log.log("{} out of {} tests succeeded".format(nr_succeeded, nr_tests))
@contextmanager
def subtest(name):
global nr_tests
global nr_succeeded
with log.nested(name):
nr_tests += 1
try:
yield
nr_succeeded += 1
return True
except Exception as e:
log.log("error: {}".format(str(e)))
return False
if __name__ == "__main__":
global log
log = Logger()
vlan_nrs = list(dict.fromkeys(os.environ["VLANS"].split()))
vde_sockets = [create_vlan(v) for v in vlan_nrs]
for nr, vde_socket, _, _ in vde_sockets:
os.environ["QEMU_VDE_SOCKET_{}".format(nr)] = vde_socket
vm_scripts = sys.argv[1:]
machines = [create_machine({"startCommand": s}) for s in vm_scripts]
machine_eval = [
"{0} = machines[{1}]".format(m.name, idx) for idx, m in enumerate(machines)
]
exec("\n".join(machine_eval))
nr_tests = 0
nr_succeeded = 0
@atexit.register
def clean_up():
with log.nested("cleaning up"):
for machine in machines:
if machine.pid is None:
continue
log.log("killing {} (pid {})".format(machine.name, machine.pid))
machine.process.kill()
for _, _, process, _ in vde_sockets:
process.kill()
log.close()
tic = time.time()
run_tests()
toc = time.time()
print("test script finished in {:.2f}s".format(toc - tic))

View file

@ -0,0 +1,279 @@
{ system
, pkgs ? import ../.. { inherit system config; }
# Use a minimal kernel?
, minimal ? false
# Ignored
, config ? {}
# Modules to add to each VM
, extraConfigurations ? [] }:
with import ./build-vms.nix { inherit system pkgs minimal extraConfigurations; };
with pkgs;
let
jquery-ui = callPackage ./testing/jquery-ui.nix { };
jquery = callPackage ./testing/jquery.nix { };
in rec {
inherit pkgs;
testDriver = let
testDriverScript = ./test-driver/test-driver.py;
in stdenv.mkDerivation {
name = "nixos-test-driver";
nativeBuildInputs = [ makeWrapper ];
buildInputs = [ (python3.withPackages (p: [ p.ptpython ])) ];
checkInputs = with python3Packages; [ pylint black ];
dontUnpack = true;
preferLocalBuild = true;
doCheck = true;
checkPhase = ''
pylint --errors-only ${testDriverScript}
black --check --diff ${testDriverScript}
'';
installPhase =
''
mkdir -p $out/bin
cp ${testDriverScript} $out/bin/nixos-test-driver
chmod u+x $out/bin/nixos-test-driver
# TODO: copy user script part into this file (append)
wrapProgram $out/bin/nixos-test-driver \
--prefix PATH : "${lib.makeBinPath [ qemu_test vde2 netpbm coreutils ]}" \
'';
};
# Run an automated test suite in the given virtual network.
# `driver' is the script that runs the network.
runTests = driver:
stdenv.mkDerivation {
name = "vm-test-run-${driver.testName}";
requiredSystemFeatures = [ "kvm" "nixos-test" ];
buildInputs = [ libxslt ];
buildCommand =
''
mkdir -p $out/nix-support
LOGFILE=$out/log.xml tests='exec(os.environ["testScript"])' ${driver}/bin/nixos-test-driver
# Generate a pretty-printed log.
xsltproc --output $out/log.html ${./test-driver/log2html.xsl} $out/log.xml
ln -s ${./test-driver/logfile.css} $out/logfile.css
ln -s ${./test-driver/treebits.js} $out/treebits.js
ln -s ${jquery}/js/jquery.min.js $out/
ln -s ${jquery}/js/jquery.js $out/
ln -s ${jquery-ui}/js/jquery-ui.min.js $out/
ln -s ${jquery-ui}/js/jquery-ui.js $out/
touch $out/nix-support/hydra-build-products
echo "report testlog $out log.html" >> $out/nix-support/hydra-build-products
for i in */xchg/coverage-data; do
mkdir -p $out/coverage-data
mv $i $out/coverage-data/$(dirname $(dirname $i))
done
'';
};
makeTest =
{ testScript
, makeCoverageReport ? false
, enableOCR ? false
, name ? "unnamed"
, ...
} @ t:
let
# A standard store path to the vm monitor is built like this:
# /tmp/nix-build-vm-test-run-$name.drv-0/vm-state-machine/monitor
# The max filename length of a unix domain socket is 108 bytes.
# This means $name can at most be 50 bytes long.
maxTestNameLen = 50;
testNameLen = builtins.stringLength name;
testDriverName = with builtins;
if testNameLen > maxTestNameLen then
abort ("The name of the test '${name}' must not be longer than ${toString maxTestNameLen} " +
"it's currently ${toString testNameLen} characters long.")
else
"nixos-test-driver-${name}";
nodes = buildVirtualNetwork (
t.nodes or (if t ? machine then { machine = t.machine; } else { }));
testScript' =
# Call the test script with the computed nodes.
if lib.isFunction testScript
then testScript { inherit nodes; }
else testScript;
vlans = map (m: m.config.virtualisation.vlans) (lib.attrValues nodes);
vms = map (m: m.config.system.build.vm) (lib.attrValues nodes);
ocrProg = tesseract4.override { enableLanguages = [ "eng" ]; };
imagemagick_tiff = imagemagick_light.override { inherit libtiff; };
# Generate onvenience wrappers for running the test driver
# interactively with the specified network, and for starting the
# VMs from the command line.
driver = runCommand testDriverName
{ buildInputs = [ makeWrapper];
testScript = testScript';
preferLocalBuild = true;
testName = name;
}
''
mkdir -p $out/bin
echo -n "$testScript" > $out/test-script
${python3Packages.black}/bin/black --check --diff $out/test-script
ln -s ${testDriver}/bin/nixos-test-driver $out/bin/
vms=($(for i in ${toString vms}; do echo $i/bin/run-*-vm; done))
wrapProgram $out/bin/nixos-test-driver \
--add-flags "''${vms[*]}" \
${lib.optionalString enableOCR
"--prefix PATH : '${ocrProg}/bin:${imagemagick_tiff}/bin'"} \
--run "export testScript=\"\$(cat $out/test-script)\"" \
--set VLANS '${toString vlans}'
ln -s ${testDriver}/bin/nixos-test-driver $out/bin/nixos-run-vms
wrapProgram $out/bin/nixos-run-vms \
--add-flags "''${vms[*]}" \
${lib.optionalString enableOCR "--prefix PATH : '${ocrProg}/bin'"} \
--set tests 'start_all(); join_all();' \
--set VLANS '${toString vlans}' \
${lib.optionalString (builtins.length vms == 1) "--set USE_SERIAL 1"}
''; # "
passMeta = drv: drv // lib.optionalAttrs (t ? meta) {
meta = (drv.meta or {}) // t.meta;
};
test = passMeta (runTests driver);
report = passMeta (releaseTools.gcovReport { coverageRuns = [ test ]; });
nodeNames = builtins.attrNames nodes;
invalidNodeNames = lib.filter
(node: builtins.match "^[A-z_][A-z0-9_]+$" node == null) nodeNames;
in
if lib.length invalidNodeNames > 0 then
throw ''
Cannot create machines out of (${lib.concatStringsSep ", " invalidNodeNames})!
All machines are referenced as perl variables in the testing framework which will break the
script when special characters are used.
Please stick to alphanumeric chars and underscores as separation.
''
else
(if makeCoverageReport then report else test) // {
inherit nodes driver test;
};
runInMachine =
{ drv
, machine
, preBuild ? ""
, postBuild ? ""
, ... # ???
}:
let
vm = buildVM { }
[ machine
{ key = "run-in-machine";
networking.hostName = "client";
nix.readOnlyStore = false;
virtualisation.writableStore = false;
}
];
buildrunner = writeText "vm-build" ''
source $1
${coreutils}/bin/mkdir -p $TMPDIR
cd $TMPDIR
exec $origBuilder $origArgs
'';
testScript = ''
startAll;
$client->waitForUnit("multi-user.target");
${preBuild}
$client->succeed("env -i ${bash}/bin/bash ${buildrunner} /tmp/xchg/saved-env >&2");
${postBuild}
$client->succeed("sync"); # flush all data before pulling the plug
'';
vmRunCommand = writeText "vm-run" ''
xchg=vm-state-client/xchg
${coreutils}/bin/mkdir $out
${coreutils}/bin/mkdir -p $xchg
for i in $passAsFile; do
i2=''${i}Path
_basename=$(${coreutils}/bin/basename ''${!i2})
${coreutils}/bin/cp ''${!i2} $xchg/$_basename
eval $i2=/tmp/xchg/$_basename
${coreutils}/bin/ls -la $xchg
done
unset i i2 _basename
export | ${gnugrep}/bin/grep -v '^xchg=' > $xchg/saved-env
unset xchg
export tests='${testScript}'
${testDriver}/bin/nixos-test-driver ${vm.config.system.build.vm}/bin/run-*-vm
''; # */
in
lib.overrideDerivation drv (attrs: {
requiredSystemFeatures = [ "kvm" ];
builder = "${bash}/bin/sh";
args = ["-e" vmRunCommand];
origArgs = attrs.args;
origBuilder = attrs.builder;
});
runInMachineWithX = { require ? [], ... } @ args:
let
client =
{ ... }:
{
inherit require;
virtualisation.memorySize = 1024;
services.xserver.enable = true;
services.xserver.displayManager.slim.enable = false;
services.xserver.displayManager.auto.enable = true;
services.xserver.windowManager.default = "icewm";
services.xserver.windowManager.icewm.enable = true;
services.xserver.desktopManager.default = "none";
};
in
runInMachine ({
machine = client;
preBuild =
''
$client->waitForX;
'';
} // args);
simpleTest = as: (makeTest as).test;
}

View file

@ -1,6 +1,6 @@
let
commonConfig = ./common/letsencrypt/common.nix;
in import ./make-test.nix {
in import ./make-test-python.nix {
name = "acme";
nodes = rec {
@ -90,39 +90,44 @@ in import ./make-test.nix {
newServerSystem = nodes.webserver2.config.system.build.toplevel;
switchToNewServer = "${newServerSystem}/bin/switch-to-configuration test";
in
# Note, waitForUnit does not work for oneshot services that do not have RemainAfterExit=true,
# Note, wait_for_unit does not work for oneshot services that do not have RemainAfterExit=true,
# this is because a oneshot goes from inactive => activating => inactive, and never
# reaches the active state. To work around this, we create some mock target units which
# get pulled in by the oneshot units. The target units linger after activation, and hence we
# can use them to probe that a oneshot fired. It is a bit ugly, but it is the best we can do
''
$client->start;
$letsencrypt->start;
$acmeStandalone->start;
client.start()
letsencrypt.start()
acmeStandalone.start()
$letsencrypt->waitForUnit("default.target");
$letsencrypt->waitForUnit("pebble.service");
letsencrypt.wait_for_unit("default.target")
letsencrypt.wait_for_unit("pebble.service")
subtest "can request certificate with HTTPS-01 challenge", sub {
$acmeStandalone->waitForUnit("default.target");
$acmeStandalone->succeed("systemctl start acme-standalone.com.service");
$acmeStandalone->waitForUnit("acme-finished-standalone.com.target");
};
with subtest("can request certificate with HTTPS-01 challenge"):
acmeStandalone.wait_for_unit("default.target")
acmeStandalone.succeed("systemctl start acme-standalone.com.service")
acmeStandalone.wait_for_unit("acme-finished-standalone.com.target")
$client->waitForUnit("default.target");
client.wait_for_unit("default.target")
$client->succeed('curl https://acme-v02.api.letsencrypt.org:15000/roots/0 > /tmp/ca.crt');
$client->succeed('curl https://acme-v02.api.letsencrypt.org:15000/intermediate-keys/0 >> /tmp/ca.crt');
client.succeed("curl https://acme-v02.api.letsencrypt.org:15000/roots/0 > /tmp/ca.crt")
client.succeed(
"curl https://acme-v02.api.letsencrypt.org:15000/intermediate-keys/0 >> /tmp/ca.crt"
)
subtest "Can request certificate for nginx service", sub {
$webserver->waitForUnit("acme-finished-a.example.com.target");
$client->succeed('curl --cacert /tmp/ca.crt https://a.example.com/ | grep -qF "hello world"');
};
with subtest("Can request certificate for nginx service"):
webserver.wait_for_unit("acme-finished-a.example.com.target")
client.succeed(
"curl --cacert /tmp/ca.crt https://a.example.com/ | grep -qF 'hello world'"
)
subtest "Can add another certificate for nginx service", sub {
$webserver->succeed("/run/current-system/fine-tune/child-1/bin/switch-to-configuration test");
$webserver->waitForUnit("acme-finished-b.example.com.target");
$client->succeed('curl --cacert /tmp/ca.crt https://b.example.com/ | grep -qF "hello world"');
};
with subtest("Can add another certificate for nginx service"):
webserver.succeed(
"/run/current-system/fine-tune/child-1/bin/switch-to-configuration test"
)
webserver.wait_for_unit("acme-finished-b.example.com.target")
client.succeed(
"curl --cacert /tmp/ca.crt https://b.example.com/ | grep -qF 'hello world'"
)
'';
}

View file

@ -1,4 +1,4 @@
import ./make-test.nix ({ pkgs, ...} : {
import ./make-test-python.nix ({ pkgs, ...} : {
name = "ammonite";
meta = with pkgs.stdenv.lib.maintainers; {
maintainers = [ nequissimus ];
@ -13,8 +13,8 @@ import ./make-test.nix ({ pkgs, ...} : {
};
testScript = ''
startAll;
start_all()
$amm->succeed("amm -c 'val foo = 21; println(foo * 2)' | grep 42")
amm.succeed("amm -c 'val foo = 21; println(foo * 2)' | grep 42")
'';
})

View file

@ -1,4 +1,4 @@
import ./make-test.nix ({ pkgs, lib, ... }:
import ./make-test-python.nix ({ pkgs, lib, ... }:
{
name = "automysqlbackup";
@ -15,20 +15,24 @@ import ./make-test.nix ({ pkgs, lib, ... }:
};
testScript = ''
startAll;
start_all()
# Need to have mysql started so that it can be populated with data.
$machine->waitForUnit("mysql.service");
machine.wait_for_unit("mysql.service")
# Wait for testdb to be fully populated (5 rows).
$machine->waitUntilSucceeds("mysql -u root -D testdb -N -B -e 'select count(id) from tests' | grep -q 5");
with subtest("Wait for testdb to be fully populated (5 rows)."):
machine.wait_until_succeeds(
"mysql -u root -D testdb -N -B -e 'select count(id) from tests' | grep -q 5"
)
# Do a backup and wait for it to start
$machine->startJob("automysqlbackup.service");
$machine->waitForJob("automysqlbackup.service");
with subtest("Do a backup and wait for it to start"):
machine.start_job("automysqlbackup.service")
machine.wait_for_job("automysqlbackup.service")
# wait for backup file and check that data appears in backup
$machine->waitForFile("/var/backup/mysql/daily/testdb");
$machine->succeed("${pkgs.gzip}/bin/zcat /var/backup/mysql/daily/testdb/daily_testdb_*.sql.gz | grep hello");
with subtest("wait for backup file and check that data appears in backup"):
machine.wait_for_file("/var/backup/mysql/daily/testdb")
machine.succeed(
"${pkgs.gzip}/bin/zcat /var/backup/mysql/daily/testdb/daily_testdb_*.sql.gz | grep hello"
)
'';
})

View file

@ -6,7 +6,7 @@
# which only works if the first client successfully uses the UPnP-IGD
# protocol to poke a hole in the NAT.
import ./make-test.nix ({ pkgs, ... }:
import ./make-test-python.nix ({ pkgs, ... }:
let
@ -108,42 +108,56 @@ in
testScript =
{ nodes, ... }:
''
startAll;
start_all()
# Wait for network and miniupnpd.
$router->waitForUnit("network-online.target");
$router->waitForUnit("miniupnpd");
router.wait_for_unit("network-online.target")
router.wait_for_unit("miniupnpd")
# Create the torrent.
$tracker->succeed("mkdir /tmp/data");
$tracker->succeed("cp ${file} /tmp/data/test.tar.bz2");
$tracker->succeed("transmission-create /tmp/data/test.tar.bz2 --private --tracker http://${externalTrackerAddress}:6969/announce --outfile /tmp/test.torrent");
$tracker->succeed("chmod 644 /tmp/test.torrent");
tracker.succeed("mkdir /tmp/data")
tracker.succeed(
"cp ${file} /tmp/data/test.tar.bz2"
)
tracker.succeed(
"transmission-create /tmp/data/test.tar.bz2 --private --tracker http://${externalTrackerAddress}:6969/announce --outfile /tmp/test.torrent"
)
tracker.succeed("chmod 644 /tmp/test.torrent")
# Start the tracker. !!! use a less crappy tracker
$tracker->waitForUnit("network-online.target");
$tracker->waitForUnit("opentracker.service");
$tracker->waitForOpenPort(6969);
tracker.wait_for_unit("network-online.target")
tracker.wait_for_unit("opentracker.service")
tracker.wait_for_open_port(6969)
# Start the initial seeder.
$tracker->succeed("transmission-remote --add /tmp/test.torrent --no-portmap --no-dht --download-dir /tmp/data");
tracker.succeed(
"transmission-remote --add /tmp/test.torrent --no-portmap --no-dht --download-dir /tmp/data"
)
# Now we should be able to download from the client behind the NAT.
$tracker->waitForUnit("httpd");
$client1->waitForUnit("network-online.target");
$client1->succeed("transmission-remote --add http://${externalTrackerAddress}/test.torrent --download-dir /tmp >&2 &");
$client1->waitForFile("/tmp/test.tar.bz2");
$client1->succeed("cmp /tmp/test.tar.bz2 ${file}");
tracker.wait_for_unit("httpd")
client1.wait_for_unit("network-online.target")
client1.succeed(
"transmission-remote --add http://${externalTrackerAddress}/test.torrent --download-dir /tmp >&2 &"
)
client1.wait_for_file("/tmp/test.tar.bz2")
client1.succeed(
"cmp /tmp/test.tar.bz2 ${file}"
)
# Bring down the initial seeder.
# $tracker->stopJob("transmission");
# tracker.stop_job("transmission")
# Now download from the second client. This can only succeed if
# the first client created a NAT hole in the router.
$client2->waitForUnit("network-online.target");
$client2->succeed("transmission-remote --add http://${externalTrackerAddress}/test.torrent --no-portmap --no-dht --download-dir /tmp >&2 &");
$client2->waitForFile("/tmp/test.tar.bz2");
$client2->succeed("cmp /tmp/test.tar.bz2 ${file}");
client2.wait_for_unit("network-online.target")
client2.succeed(
"transmission-remote --add http://${externalTrackerAddress}/test.torrent --no-portmap --no-dht --download-dir /tmp >&2 &"
)
client2.wait_for_file("/tmp/test.tar.bz2")
client2.succeed(
"cmp /tmp/test.tar.bz2 ${file}"
)
'';
})

View file

@ -3,7 +3,7 @@
pkgs ? import ../.. { inherit system config; }
}:
with import ../lib/testing.nix { inherit system pkgs; };
with import ../lib/testing-python.nix { inherit system pkgs; };
with pkgs.lib;
let
@ -17,11 +17,11 @@ let
];
}).config.system.build.isoImage;
perlAttrs = params: "{ ${concatStringsSep ", " (mapAttrsToList (name: param: "${name} => ${builtins.toJSON param}") params)} }";
pythonDict = params: "\n {\n ${concatStringsSep ",\n " (mapAttrsToList (name: param: "\"${name}\": \"${param}\"") params)},\n }\n";
makeBootTest = name: extraConfig:
let
machineConfig = perlAttrs ({ qemuFlags = "-m 768"; } // extraConfig);
machineConfig = pythonDict ({ qemuFlags = "-m 768"; } // extraConfig);
in
makeTest {
inherit iso;
@ -29,16 +29,16 @@ let
nodes = { };
testScript =
''
my $machine = createMachine(${machineConfig});
$machine->start;
$machine->waitForUnit("multi-user.target");
$machine->succeed("nix verify -r --no-trust /run/current-system");
machine = create_machine(${machineConfig})
machine.start()
machine.wait_for_unit("multi-user.target")
machine.succeed("nix verify -r --no-trust /run/current-system")
# Test whether the channel got installed correctly.
$machine->succeed("nix-instantiate --dry-run '<nixpkgs>' -A hello");
$machine->succeed("nix-env --dry-run -iA nixos.procps");
with subtest("Check whether the channel got installed correctly"):
machine.succeed("nix-instantiate --dry-run '<nixpkgs>' -A hello")
machine.succeed("nix-env --dry-run -iA nixos.procps")
$machine->shutdown;
machine.shutdown()
'';
};
@ -60,7 +60,7 @@ let
config.system.build.netbootIpxeScript
];
};
machineConfig = perlAttrs ({
machineConfig = pythonDict ({
qemuFlags = "-boot order=n -m 2000";
netBackendArgs = "tftp=${ipxeBootDir},bootfile=netboot.ipxe";
} // extraConfig);
@ -68,12 +68,11 @@ let
makeTest {
name = "boot-netboot-" + name;
nodes = { };
testScript =
''
my $machine = createMachine(${machineConfig});
$machine->start;
$machine->waitForUnit("multi-user.target");
$machine->shutdown;
testScript = ''
machine = create_machine(${machineConfig})
machine.start()
machine.wait_for_unit("multi-user.target")
machine.shutdown()
'';
};
in {

View file

@ -1,4 +1,4 @@
import ./make-test.nix ({ pkgs, ...} : {
import ./make-test-python.nix ({ pkgs, ...} : {
name = "emacs-daemon";
meta = with pkgs.stdenv.lib.maintainers; {
maintainers = [ ];
@ -21,25 +21,28 @@ import ./make-test.nix ({ pkgs, ...} : {
environment.variables.TEST_SYSTEM_VARIABLE = "system variable";
};
testScript =
''
$machine->waitForUnit("multi-user.target");
testScript = ''
machine.wait_for_unit("multi-user.target")
# checks that the EDITOR environment variable is set
$machine->succeed("test \$(basename \"\$EDITOR\") = emacseditor");
machine.succeed('test $(basename "$EDITOR") = emacseditor')
# waits for the emacs service to be ready
$machine->waitUntilSucceeds("systemctl --user status emacs.service | grep 'Active: active'");
machine.wait_until_succeeds(
"systemctl --user status emacs.service | grep 'Active: active'"
)
# connects to the daemon
$machine->succeed("emacsclient --create-frame \$EDITOR &");
machine.succeed("emacsclient --create-frame $EDITOR &")
# checks that Emacs shows the edited filename
$machine->waitForText("emacseditor");
machine.wait_for_text("emacseditor")
# makes sure environment variables are accessible from Emacs
$machine->succeed("emacsclient --eval '(getenv \"TEST_SYSTEM_VARIABLE\")'") =~ /system variable/ or die;
machine.succeed(
"emacsclient --eval '(getenv \"TEST_SYSTEM_VARIABLE\")' | grep -q 'system variable'"
)
$machine->screenshot("emacsclient");
machine.screenshot("emacsclient")
'';
})

View file

@ -3,7 +3,7 @@
pkgs ? import ../.. { inherit system config; }
}:
with import ../lib/testing.nix { inherit system pkgs; };
with import ../lib/testing-python.nix { inherit system pkgs; };
with pkgs.lib;
{
@ -18,11 +18,11 @@ with pkgs.lib;
};
testScript = ''
startAll;
start_all()
$machine->waitForUnit('gitea.service');
$machine->waitForOpenPort('3000');
$machine->succeed("curl --fail http://localhost:3000/");
machine.wait_for_unit("gitea.service")
machine.wait_for_open_port(3000)
machine.succeed("curl --fail http://localhost:3000/")
'';
};
@ -37,11 +37,11 @@ with pkgs.lib;
};
testScript = ''
startAll;
start_all()
$machine->waitForUnit('gitea.service');
$machine->waitForOpenPort('3000');
$machine->succeed("curl --fail http://localhost:3000/");
machine.wait_for_unit("gitea.service")
machine.wait_for_open_port(3000)
machine.succeed("curl --fail http://localhost:3000/")
'';
};
@ -56,12 +56,14 @@ with pkgs.lib;
};
testScript = ''
startAll;
start_all()
$machine->waitForUnit('gitea.service');
$machine->waitForOpenPort('3000');
$machine->succeed("curl --fail http://localhost:3000/");
$machine->succeed("curl --fail http://localhost:3000/user/sign_up | grep 'Registration is disabled. Please contact your site administrator.'");
machine.wait_for_unit("gitea.service")
machine.wait_for_open_port(3000)
machine.succeed("curl --fail http://localhost:3000/")
machine.succeed(
"curl --fail http://localhost:3000/user/sign_up | grep 'Registration is disabled. Please contact your site administrator.'"
)
'';
};
}

View file

@ -1,4 +1,4 @@
import ./make-test.nix ({ pkgs, latestKernel ? false, ... }:
import ./make-test-python.nix ({ pkgs, latestKernel ? false, ... }:
{
name = "login";
@ -12,62 +12,48 @@ import ./make-test.nix ({ pkgs, latestKernel ? false, ... }:
sound.enable = true; # needed for the factl test, /dev/snd/* exists without them but udev doesn't care then
};
testScript =
''
$machine->waitForUnit('multi-user.target');
$machine->waitUntilSucceeds("pgrep -f 'agetty.*tty1'");
$machine->screenshot("postboot");
testScript = ''
machine.wait_for_unit("multi-user.target")
machine.wait_until_succeeds("pgrep -f 'agetty.*tty1'")
machine.screenshot("postboot")
subtest "create user", sub {
$machine->succeed("useradd -m alice");
$machine->succeed("(echo foobar; echo foobar) | passwd alice");
};
with subtest("create user"):
machine.succeed("useradd -m alice")
machine.succeed("(echo foobar; echo foobar) | passwd alice")
# Check whether switching VTs works.
subtest "virtual console switching", sub {
$machine->fail("pgrep -f 'agetty.*tty2'");
$machine->sendKeys("alt-f2");
$machine->waitUntilSucceeds("[ \$(fgconsole) = 2 ]");
$machine->waitForUnit('getty@tty2.service');
$machine->waitUntilSucceeds("pgrep -f 'agetty.*tty2'");
};
with subtest("Check whether switching VTs works"):
machine.fail("pgrep -f 'agetty.*tty2'")
machine.send_key("alt-f2")
machine.wait_until_succeeds("[ $(fgconsole) = 2 ]")
machine.wait_for_unit("getty@tty2.service")
machine.wait_until_succeeds("pgrep -f 'agetty.*tty2'")
# Log in as alice on a virtual console.
subtest "virtual console login", sub {
$machine->waitUntilTTYMatches(2, "login: ");
$machine->sendChars("alice\n");
$machine->waitUntilTTYMatches(2, "login: alice");
$machine->waitUntilSucceeds("pgrep login");
$machine->waitUntilTTYMatches(2, "Password: ");
$machine->sendChars("foobar\n");
$machine->waitUntilSucceeds("pgrep -u alice bash");
$machine->sendChars("touch done\n");
$machine->waitForFile("/home/alice/done");
};
with subtest("Log in as alice on a virtual console"):
machine.wait_until_tty_matches(2, "login: ")
machine.send_chars("alice\n")
machine.wait_until_tty_matches(2, "login: alice")
machine.wait_until_succeeds("pgrep login")
machine.wait_until_tty_matches(2, "Password: ")
machine.send_chars("foobar\n")
machine.wait_until_succeeds("pgrep -u alice bash")
machine.send_chars("touch done\n")
machine.wait_for_file("/home/alice/done")
# Check whether systemd gives and removes device ownership as
# needed.
subtest "device permissions", sub {
$machine->succeed("getfacl -p /dev/snd/timer | grep -q alice");
$machine->sendKeys("alt-f1");
$machine->waitUntilSucceeds("[ \$(fgconsole) = 1 ]");
$machine->fail("getfacl -p /dev/snd/timer | grep -q alice");
$machine->succeed("chvt 2");
$machine->waitUntilSucceeds("getfacl -p /dev/snd/timer | grep -q alice");
};
with subtest("Systemd gives and removes device ownership as needed"):
machine.succeed("getfacl /dev/snd/timer | grep -q alice")
machine.send_key("alt-f1")
machine.wait_until_succeeds("[ $(fgconsole) = 1 ]")
machine.fail("getfacl /dev/snd/timer | grep -q alice")
machine.succeed("chvt 2")
machine.wait_until_succeeds("getfacl /dev/snd/timer | grep -q alice")
# Log out.
subtest "virtual console logout", sub {
$machine->sendChars("exit\n");
$machine->waitUntilFails("pgrep -u alice bash");
$machine->screenshot("mingetty");
};
# Check whether ctrl-alt-delete works.
subtest "ctrl-alt-delete", sub {
$machine->sendKeys("ctrl-alt-delete");
$machine->waitForShutdown;
};
'';
with subtest("Virtual console logout"):
machine.send_chars("exit\n")
machine.wait_until_fails("pgrep -u alice bash")
machine.screenshot("mingetty")
with subtest("Check whether ctrl-alt-delete works"):
machine.send_key("ctrl-alt-delete")
machine.wait_for_shutdown()
'';
})

View file

@ -0,0 +1,9 @@
f: {
system ? builtins.currentSystem,
pkgs ? import ../.. { inherit system; config = {}; },
...
} @ args:
with import ../lib/testing-python.nix { inherit system pkgs; };
makeTest (if pkgs.lib.isFunction f then f (args // { inherit pkgs; inherit (pkgs) lib; }) else f)

View file

@ -3,7 +3,7 @@
pkgs ? import ../.. { inherit system config; }
}:
with import ../lib/testing.nix { inherit system pkgs; };
with import ../lib/testing-python.nix { inherit system pkgs; };
with pkgs.lib;
let
@ -40,29 +40,33 @@ let
backupName = if backup-all then "all" else "postgres";
backupService = if backup-all then "postgresqlBackup" else "postgresqlBackup-postgres";
in ''
sub check_count {
my ($select, $nlines) = @_;
return 'test $(sudo -u postgres psql postgres -tAc "' . $select . '"|wc -l) -eq ' . $nlines;
}
def check_count(statement, lines):
return 'test $(sudo -u postgres psql postgres -tAc "{}"|wc -l) -eq {}'.format(
statement, lines
)
machine.start()
machine.wait_for_unit("postgresql")
$machine->start;
$machine->waitForUnit("postgresql");
# postgresql should be available just after unit start
$machine->succeed("cat ${test-sql} | sudo -u postgres psql");
$machine->shutdown; # make sure that postgresql survive restart (bug #1735)
sleep(2);
$machine->start;
$machine->waitForUnit("postgresql");
$machine->fail(check_count("SELECT * FROM sth;", 3));
$machine->succeed(check_count("SELECT * FROM sth;", 5));
$machine->fail(check_count("SELECT * FROM sth;", 4));
$machine->succeed(check_count("SELECT xpath(\'/test/text()\', doc) FROM xmltest;", 1));
machine.succeed(
"cat ${test-sql} | sudo -u postgres psql"
)
machine.shutdown() # make sure that postgresql survive restart (bug #1735)
time.sleep(2)
machine.start()
machine.wait_for_unit("postgresql")
machine.fail(check_count("SELECT * FROM sth;", 3))
machine.succeed(check_count("SELECT * FROM sth;", 5))
machine.fail(check_count("SELECT * FROM sth;", 4))
machine.succeed(check_count("SELECT xpath('/test/text()', doc) FROM xmltest;", 1))
# Check backup service
$machine->succeed("systemctl start ${backupService}.service");
$machine->succeed("zcat /var/backup/postgresql/${backupName}.sql.gz | grep '<test>ok</test>'");
$machine->succeed("stat -c '%a' /var/backup/postgresql/${backupName}.sql.gz | grep 600");
$machine->shutdown;
machine.succeed("systemctl start ${backupService}.service")
machine.succeed("zcat /var/backup/postgresql/${backupName}.sql.gz | grep '<test>ok</test>'")
machine.succeed("stat -c '%a' /var/backup/postgresql/${backupName}.sql.gz | grep 600")
machine.shutdown()
'';
};

View file

@ -1,4 +1,4 @@
import ./make-test.nix ({ pkgs, ...} :
import ./make-test-python.nix ({ pkgs, ...} :
let
@ -59,37 +59,37 @@ rec {
testScript =
''
startAll;
start_all()
$server->waitForUnit("quake3-server");
$client1->waitForX;
$client2->waitForX;
server.wait_for_unit("quake3-server")
client1.wait_for_x()
client2.wait_for_x()
$client1->execute("quake3 +set r_fullscreen 0 +set name Foo +connect server &");
$client2->execute("quake3 +set r_fullscreen 0 +set name Bar +connect server &");
client1.execute("quake3 +set r_fullscreen 0 +set name Foo +connect server &")
client2.execute("quake3 +set r_fullscreen 0 +set name Bar +connect server &")
$server->waitUntilSucceeds("grep -q 'Foo.*entered the game' /tmp/log");
$server->waitUntilSucceeds("grep -q 'Bar.*entered the game' /tmp/log");
server.wait_until_succeeds("grep -q 'Foo.*entered the game' /tmp/log")
server.wait_until_succeeds("grep -q 'Bar.*entered the game' /tmp/log")
$server->sleep(10); # wait for a while to get a nice screenshot
server.sleep(10) # wait for a while to get a nice screenshot
$client1->block();
client1.block()
$server->sleep(20);
server.sleep(20)
$client1->screenshot("screen1");
$client2->screenshot("screen2");
client1.screenshot("screen1")
client2.screenshot("screen2")
$client1->unblock();
client1.unblock()
$server->sleep(10);
server.sleep(10)
$client1->screenshot("screen3");
$client2->screenshot("screen4");
client1.screenshot("screen3")
client2.screenshot("screen4")
$client1->shutdown();
$client2->shutdown();
$server->stopJob("quake3-server");
client1.shutdown()
client2.shutdown()
server.stop_job("quake3-server")
'';
})

View file

@ -2,7 +2,7 @@ let
wg-snakeoil-keys = import ./snakeoil-keys.nix;
in
import ../make-test.nix ({ pkgs, ...} : {
import ../make-test-python.nix ({ pkgs, ...} : {
name = "wireguard";
meta = with pkgs.stdenv.lib.maintainers; {
maintainers = [ ma27 ];
@ -86,12 +86,12 @@ import ../make-test.nix ({ pkgs, ...} : {
};
testScript = ''
startAll;
start_all()
$peer0->waitForUnit("wireguard-wg0.service");
$peer1->waitForUnit("wireguard-wg0.service");
peer0.wait_for_unit("wireguard-wg0.service")
peer1.wait_for_unit("wireguard-wg0.service")
$peer1->succeed("ping -c5 fc00::1");
$peer1->succeed("ping -c5 10.23.42.1")
peer1.succeed("ping -c5 fc00::1")
peer1.succeed("ping -c5 10.23.42.1")
'';
})

View file

@ -1,4 +1,4 @@
import ../make-test.nix ({ pkgs, ...} : {
import ../make-test-python.nix ({ pkgs, ...} : {
name = "wireguard-generated";
meta = with pkgs.stdenv.lib.maintainers; {
maintainers = [ ma27 grahamc ];
@ -28,30 +28,34 @@ import ../make-test.nix ({ pkgs, ...} : {
};
testScript = ''
startAll;
start_all()
$peer1->waitForUnit("wireguard-wg0.service");
$peer2->waitForUnit("wireguard-wg0.service");
peer1.wait_for_unit("wireguard-wg0.service")
peer2.wait_for_unit("wireguard-wg0.service")
my ($retcode, $peer1pubkey) = $peer1->execute("wg pubkey < /etc/wireguard/private");
$peer1pubkey =~ s/\s+$//;
if ($retcode != 0) {
die "Could not read public key from peer1";
}
retcode, peer1pubkey = peer1.execute("wg pubkey < /etc/wireguard/private")
if retcode != 0:
raise Exception("Could not read public key from peer1")
my ($retcode, $peer2pubkey) = $peer2->execute("wg pubkey < /etc/wireguard/private");
$peer2pubkey =~ s/\s+$//;
if ($retcode != 0) {
die "Could not read public key from peer2";
}
retcode, peer2pubkey = peer2.execute("wg pubkey < /etc/wireguard/private")
if retcode != 0:
raise Exception("Could not read public key from peer2")
$peer1->succeed("wg set wg0 peer $peer2pubkey allowed-ips 10.10.10.2/32 endpoint 192.168.1.2:12345 persistent-keepalive 1");
$peer1->succeed("ip route replace 10.10.10.2/32 dev wg0 table main");
peer1.succeed(
"wg set wg0 peer {} allowed-ips 10.10.10.2/32 endpoint 192.168.1.2:12345 persistent-keepalive 1".format(
peer2pubkey.strip()
)
)
peer1.succeed("ip route replace 10.10.10.2/32 dev wg0 table main")
$peer2->succeed("wg set wg0 peer $peer1pubkey allowed-ips 10.10.10.1/32 endpoint 192.168.1.1:12345 persistent-keepalive 1");
$peer2->succeed("ip route replace 10.10.10.1/32 dev wg0 table main");
peer2.succeed(
"wg set wg0 peer {} allowed-ips 10.10.10.1/32 endpoint 192.168.1.1:12345 persistent-keepalive 1".format(
peer1pubkey.strip()
)
)
peer2.succeed("ip route replace 10.10.10.1/32 dev wg0 table main")
$peer1->succeed("ping -c1 10.10.10.2");
$peer2->succeed("ping -c1 10.10.10.1");
peer1.succeed("ping -c1 10.10.10.2")
peer2.succeed("ping -c1 10.10.10.1")
'';
})

View file

@ -7,7 +7,7 @@ with import ../lib/testing.nix { inherit system pkgs; };
let
makeTest = import ./make-test.nix;
makeTest = import ./make-test-python.nix;
makeZfsTest = name:
{ kernelPackage ? pkgs.linuxPackages_latest
@ -34,12 +34,12 @@ let
};
testScript = ''
$machine->succeed("modprobe zfs");
$machine->succeed("zpool status");
machine.succeed("modprobe zfs")
machine.succeed("zpool status")
$machine->succeed("ls /dev");
machine.succeed("ls /dev")
$machine->succeed(
machine.succeed(
"mkdir /tmp/mnt",
"udevadm settle",
@ -55,9 +55,7 @@ let
"umount /tmp/mnt",
"zpool destroy rpool",
"udevadm settle"
);
)
'' + extraTest;
};
@ -70,8 +68,8 @@ in {
unstable = makeZfsTest "unstable" {
enableUnstable = true;
extraTest = ''
$machine->succeed(
"echo password | zpool create -o altroot='/tmp/mnt' -O encryption=aes-256-gcm -O keyformat=passphrase rpool /dev/vdb1",
machine.succeed(
"echo password | zpool create -o altroot=\"/tmp/mnt\" -O encryption=aes-256-gcm -O keyformat=passphrase rpool /dev/vdb1",
"zfs create -o mountpoint=legacy rpool/root",
"mount -t zfs rpool/root /tmp/mnt",
"udevadm settle",
@ -79,7 +77,7 @@ in {
"umount /tmp/mnt",
"zpool destroy rpool",
"udevadm settle"
);
)
'';
};