#!/usr/bin/env python3 # # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://www.apache.org/licenses/LICENSE-2.0 # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. import logging import os import pathlib import re import subprocess import sys import uuid def run_cmd(cmd): try: subprocess.run([str(c) for c in cmd], check=True) except subprocess.CalledProcessError: logging.error("Failed to run command '%s'", " ".join(cmd)) sys.exit(1) def litani_add(litani, counter, *args, **kwargs): cmd = [litani, "add-job"] for arg in args: switch = re.sub("_", "-", arg) cmd.append(f"--{switch}") for arg, value in kwargs.items(): switch = re.sub("_", "-", arg) cmd.append(f"--{switch}") if isinstance(value, list): cmd.extend(value) else: cmd.append(value) run_cmd(cmd) counter["added"] += 1 print_counter(counter) def collapse(string): return re.sub(r"\s+", " ", string) def add_e2e_tests(litani, test_dir, root_dir, counter): e2e_test_dir = test_dir / "e2e" # 4 jobs per test (init, add-jobs, run-build, check-run) # skip __init__.py and __pycache__ counter["total"] += (len(os.listdir(e2e_test_dir / "tests")) - 2) * 4 for test_file in (e2e_test_dir / "tests").iterdir(): if test_file.name in ["__init__.py", "__pycache__"]: continue run_dir = e2e_test_dir / "output" / str(uuid.uuid4()) litani_add( litani, counter, command=collapse(f""" {e2e_test_dir / 'run'} --test-file {test_file} --litani {litani} --run-dir {run_dir} --operation init"""), pipeline=f"End-to-end: {test_file.stem}", ci_stage="test", description=f"{test_file.stem}: init", outputs=run_dir / ".litani_cache_dir", cwd=run_dir) litani_add( litani, counter, command=collapse(f""" {e2e_test_dir / 'run'} --test-file {test_file} --litani {litani} --run-dir {run_dir} --operation add-jobs"""), pipeline=f"End-to-end: {test_file.stem}", ci_stage="test", description=f"{test_file.stem}: add jobs", inputs=run_dir / ".litani_cache_dir", outputs=f"{run_dir}/output/jobs", cwd=run_dir) litani_add( litani, counter, command=collapse(f""" {e2e_test_dir / 'run'} --test-file {test_file} --litani {litani} --run-dir {run_dir} --operation run-build"""), pipeline=f"End-to-end: {test_file.stem}", ci_stage="test", description=f"{test_file.stem}: run build", inputs=f"{run_dir}/output/jobs", outputs=f"{run_dir}/output/run.json", cwd=run_dir) litani_add( litani, counter, command=collapse(f""" {e2e_test_dir / 'run'} --test-file {test_file} --litani {litani} --run-dir {run_dir} --operation check-run"""), pipeline=f"End-to-end: {test_file.stem}", ci_stage="report", description=f"{test_file.stem}: check run", inputs=f"{run_dir}/output/run.json", cwd=run_dir) def add_unit_tests(litani, test_dir, root_dir, counter): for fyle in (test_dir / "unit").iterdir(): if fyle.name in ["__init__.py", "__pycache__"]: continue litani_add( litani, counter, command=f"python3 -m unittest test.unit.{fyle.stem}", pipeline="Unit tests", ci_stage="test", description=fyle.stem, cwd=root_dir) counter["total"] += 1 def print_counter(counter): print("\r{added} / {total} tests added".format(**counter), end="") def main(): logging.basicConfig(format="run-tests: %(message)s") test_dir = pathlib.Path(__file__).resolve().parent root = test_dir.parent litani = root / "litani" run_cmd([ litani, "init", "--project", "Litani Test Suite", "--output-prefix", test_dir / "output", "--output-symlink", test_dir / "output" / "latest"]) counter = { "added": 0, "total": 0, } add_unit_tests(litani, test_dir, root, counter) add_e2e_tests(litani, test_dir, root, counter) print() run_cmd([litani, "run-build"]) if __name__ == "__main__": main()