2021-11-30 14:51:24 +01:00

613 lines
20 KiB
Python

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import dataclasses
import datetime
import enum
import functools
import json
import logging
import os
import pathlib
import shutil
import subprocess
import sys
import uuid
import jinja2
from lib import litani
import lib.graph
class Gnuplot:
def __init__(self):
self._should_render = shutil.which("gnuplot") is not None
def should_render(self):
return self._should_render
def render(self, gnu_file, out_file=None):
if not self.should_render():
raise UserWarning(
"Should not call Gnuplot.render() if should_render() is False")
cmd = ["gnuplot"]
with subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, text=True) as proc:
out, _ = proc.communicate(input=gnu_file)
if proc.returncode:
logging.error("Failed to render gnuplot file:")
logging.error(gnu_file)
sys.exit(1)
lines = [
l for l in out.splitlines()
if "<?xml version" not in l
]
if out_file:
with litani.atomic_write(out_file) as handle:
print("\n".join(lines), file=handle)
return lines
# ______________________________________________________________________________
# Job renderers
# ``````````````````````````````````````````````````````````````````````````````
# These classes takes a single job, stage, or pipeline, and renders some
# auxiliary page that will be linked to from the pipeline page. The classes then
# add to the job dict the URL where the page was rendered.
# ______________________________________________________________________________
class PipelineDepgraphRenderer:
def __init__(self, should_render=True):
is_graphviz_installed = shutil.which("dot") is not None
if not is_graphviz_installed:
logging.info(
"Graphviz is not installed; pipeline dependency "
"graph cannot be rendered")
self.should_render = should_render and is_graphviz_installed
def render_to_file(self, out_file, pipe):
dot_graph = lib.graph.SinglePipelineGraph.render(pipe)
out_file.parent.mkdir(exist_ok=True, parents=True)
with open(out_file, "w") as handle:
with subprocess.Popen(
["dot", "-Tsvg"], text=True, stdin=subprocess.PIPE,
stdout=handle) as proc:
proc.communicate(input=dot_graph)
if proc.returncode:
logging.error("Failed to run dot. Graph: ")
logging.error(dot_graph)
sys.exit(1)
return True
def render(self, render_root, pipe_url, pipe):
if not self.should_render:
return
out_rel = pipe_url / "dependencies.svg"
out_file = render_root / out_rel
success = self.render_to_file(out_file=out_file, pipe=pipe)
if success:
pipe["dependencies_url"] = "dependencies.svg"
@dataclasses.dataclass
class MemoryTraceRenderer:
jinja_env: jinja2.Environment
gnuplot: Gnuplot
@staticmethod
def should_render(job):
return job.get("memory_trace") \
and job["memory_trace"].get("trace") \
and len(job["memory_trace"]["trace"]) > 2
def render_preview(self, job):
job["memory_trace_preview"] = []
if not self.gnuplot.should_render():
return
templ = self.jinja_env.get_template("memory-trace.jinja.gnu")
gnu_file = templ.render(job=job)
lines = self.gnuplot.render(gnu_file)
job["memory_trace_preview"] = lines
@staticmethod
def render(_, jinja_env, job, gnuplot):
mtr = MemoryTraceRenderer(jinja_env=jinja_env, gnuplot=gnuplot)
mtr.render_preview(job)
@dataclasses.dataclass
class JobOutcomeTableRenderer:
out_dir: pathlib.Path
jinja_env: jinja2.Environment
html_url: str = None
json_url: str = None
def get_json_url(self):
return self.json_url
def get_html_url(self):
return self.html_url
def render_table(self, table):
templ = self.jinja_env.get_template("outcome_table.jinja.html")
page = templ.render(table=table)
self.out_dir.mkdir(exist_ok=True, parents=True)
self.html_url = self.out_dir / "outcome.html"
with open(self.html_url, "w") as handle:
print(page, file=handle)
self.json_url = self.out_dir / "outcome.json"
with open(self.json_url, "w") as handle:
print(json.dumps(table, indent=2), file=handle)
@staticmethod
def should_render(job):
return job["complete"] and job["loaded_outcome_dict"]
@staticmethod
def render(out_dir, jinja_env, job):
otr = JobOutcomeTableRenderer(out_dir=out_dir, jinja_env=jinja_env)
otr.render_table(table=job["loaded_outcome_dict"])
job["outcome_table_html_url"] = str(otr.get_html_url())
job["outcome_table_json_url"] = str(otr.get_json_url())
@dataclasses.dataclass
class ParallelismGraphRenderer:
"""Renders parallelism of run over time"""
run: dict
env: jinja2.Environment
gnuplot: Gnuplot
# The parallelism trace includes timestamps with microsecond precision, but
# gnuplot can't deal with fractional seconds. So just for the purposes of
# the graph, return the maximum parallelism encountered at each second. We
# still leave the microsecond stamps in the JSON file for those who need it.
@staticmethod
def process_trace(trace):
def overwrite(field, op, d1, d2):
d1[field] = op(d1[field], d2[field])
tmp = {}
for item in trace:
ms_precision = datetime.datetime.strptime(
item["time"], lib.litani.TIME_FORMAT_MS)
s_precision = ms_precision.strftime(lib.litani.TIME_FORMAT_R)
try:
tmp[s_precision]
except KeyError:
tmp[s_precision] = dict(item)
tmp[s_precision].pop("time")
else:
overwrite("finished", min, tmp[s_precision], item)
overwrite("running", max, tmp[s_precision], item)
overwrite("total", max, tmp[s_precision], item)
return sorted(
[{"time": time, **item} for time, item in tmp.items()],
key=lambda x: x["time"])
def render(self, template_name):
if not all((
self.gnuplot.should_render(),
self.run["parallelism"].get("trace")
)):
return []
gnu_templ = self.env.get_template(template_name)
gnu_file = gnu_templ.render(
n_proc=self.run["parallelism"].get("n_proc"),
max_parallelism=self.run["parallelism"].get("max_parallelism"),
trace=self.process_trace(self.run["parallelism"]["trace"]))
return [self.gnuplot.render(gnu_file)]
@dataclasses.dataclass
class StatsGroupRenderer:
"""Renders graphs for jobs that are part of a 'stats group'."""
run: dict
env: jinja2.Environment
gnuplot: Gnuplot
def get_stats_groups(self, job_filter):
ret = {}
for job in jobs_of(self.run):
if "tags" not in job["wrapper_arguments"]:
continue
if not job["wrapper_arguments"]["tags"]:
continue
stats_group = None
for tag in job["wrapper_arguments"]["tags"]:
kv = tag.split(":")
if kv[0] != "stats-group":
continue
if len(kv) != 2:
logging.warning(
"No value for stats-group in job '%s'",
job["wrapper_arguments"]["description"])
continue
stats_group = kv[1]
break
if not (stats_group and job_filter(job)):
continue
try:
ret[stats_group].append(job)
except KeyError:
ret[stats_group] = [job]
return sorted((k, v) for k, v in ret.items())
def render(self, job_filter, template_name):
if not self.gnuplot.should_render():
return []
stats_groups = self.get_stats_groups(job_filter)
svgs = []
gnu_templ = self.env.get_template(template_name)
for group_name, jobs in stats_groups:
if len(jobs) < 2:
continue
gnu_file = gnu_templ.render(
group_name=group_name, jobs=jobs)
svg_lines = self.gnuplot.render(gnu_file)
svgs.append(svg_lines)
return svgs
def get_run(cache_dir):
with open(cache_dir / litani.CACHE_FILE) as handle:
ret = json.load(handle)
ret["pipelines"] = {}
for job in ret["jobs"]:
status_file = litani.get_status_dir() / ("%s.json" % job["job_id"])
try:
with open(str(status_file)) as handle:
status = json.load(handle)
except FileNotFoundError:
status = {
"complete": False,
"wrapper_arguments": job,
}
pipeline_name = status["wrapper_arguments"]["pipeline_name"]
ci_stage = status["wrapper_arguments"]["ci_stage"]
try:
ret["pipelines"][pipeline_name]["ci_stages"][ci_stage]["jobs"].append(status)
except KeyError:
try:
ret["pipelines"][pipeline_name]["ci_stages"][ci_stage]["jobs"] = [status]
except KeyError:
try:
ret["pipelines"][pipeline_name]["ci_stages"][ci_stage] = {
"jobs": [status]
}
except KeyError:
try:
ret["pipelines"][pipeline_name]["ci_stages"] = {
ci_stage: {
"name": ci_stage,
"jobs": [status],
}
}
except KeyError:
ret["pipelines"][pipeline_name] = {
"name": pipeline_name,
"ci_stages": {
ci_stage: {
"jobs": [status]
}
},
}
ret.pop("jobs")
return ret
def job_sorter(j1, j2):
if j1 is None or j2 is None:
raise ValueError("Jobs must be non-None")
if not ("start_time" in j1 or "start_time" in j2):
return 0
if not "start_time" in j1:
return 1
if not "start_time" in j2:
return -1
if j1["start_time"] != j2["start_time"]:
return -1 if j1["start_time"] < j2["start_time"] else 1
if not ("end_time" in j1 or "end_time" in j2):
return 0
if not "end_time" in j1:
return 1
if not "end_time" in j2:
return -1
return -1 if j1["end_time"] < j2["end_time"] else 1
class StageStatus(enum.IntEnum):
FAIL = 0
FAIL_IGNORED = 1
SUCCESS = 2
def add_stage_stats(stage, stage_name, pipeline_name):
n_complete_jobs = len([j for j in stage["jobs"] if j["complete"]])
if stage["jobs"]:
stage["progress"] = int(n_complete_jobs * 100 / len(stage["jobs"]))
stage["complete"] = n_complete_jobs == len(stage["jobs"])
else:
stage["progress"] = 0
stage["complete"] = True
status = StageStatus.SUCCESS
for job in stage["jobs"]:
try:
if not job["complete"]:
continue
if job["outcome"] == "fail":
status = StageStatus.FAIL
elif job["outcome"] == "fail_ignored" and status == StageStatus.SUCCESS:
status = StageStatus.FAIL_IGNORED
except KeyError:
logging.error(
"Could not find key in stage: %s",
json.dumps(stage, indent=2))
sys.exit(1)
stage["status"] = status.name.lower()
stage["url"] = "artifacts/%s/%s" % (pipeline_name, stage_name)
stage["name"] = stage_name
class PipeStatus(enum.IntEnum):
FAIL = 0
IN_PROGRESS = 1
SUCCESS = 2
def add_pipe_stats(pipe):
pipe["url"] = "pipelines/%s" % pipe["name"]
incomplete = [s for s in pipe["ci_stages"] if not s["complete"]]
if incomplete:
pipe["status"] = PipeStatus.IN_PROGRESS
else:
pipe["status"] = PipeStatus.SUCCESS
for stage in pipe["ci_stages"]:
if stage["status"] in ["fail", "fail_ignored"]:
pipe["status"] = PipeStatus.FAIL
break
def add_run_stats(run):
status = PipeStatus.SUCCESS
if [p for p in run["pipelines"] if p["status"] == PipeStatus.IN_PROGRESS]:
status = PipeStatus.IN_PROGRESS
if [p for p in run["pipelines"] if p["status"] == PipeStatus.FAIL]:
status = PipeStatus.FAIL
run["status"] = status.name.lower()
for pipe in run["pipelines"]:
pipe["status"] = pipe["status"].name.lower()
def add_job_stats(jobs):
for job in jobs:
if not ("start_time" in job and "end_time" in job):
job["duration_str"] = None
else:
s = datetime.datetime.strptime(
job["start_time"], litani.TIME_FORMAT_R)
e = datetime.datetime.strptime(
job["end_time"], litani.TIME_FORMAT_R)
seconds = (e - s).seconds
job["duration_str"] = s_to_hhmmss(seconds)
job["duration"] = seconds
def sort_run(run):
pipelines = []
js = functools.cmp_to_key(job_sorter)
for pipe in run["pipelines"].values():
stages = []
for stage in run["stages"]:
try:
pipeline_stage = pipe["ci_stages"][stage]
except KeyError:
pipe["ci_stages"][stage] = {"jobs": []}
pipeline_stage = pipe["ci_stages"][stage]
jobs = sorted(pipeline_stage["jobs"], key=js)
add_job_stats(jobs)
pipeline_stage["jobs"] = jobs
add_stage_stats(pipeline_stage, stage, pipe["name"])
stages.append(pipeline_stage)
pipe["ci_stages"] = stages
add_pipe_stats(pipe)
pipelines.append(pipe)
pipelines = sorted(pipelines, key=lambda p: p["status"])
run["pipelines"] = pipelines
add_run_stats(run)
def get_run_data(cache_dir):
run = get_run(cache_dir)
sort_run(run)
return run
def s_to_hhmmss(s):
h, s = divmod(s, 3600)
m, s = divmod(s, 60)
if h:
return "{h:02d}h {m:02d}m {s:02d}s".format(h=h, m=m, s=s)
if m:
return "{m:02d}m {s:02d}s".format(m=m, s=s)
return "{s:02d}s".format(s=s)
def jobs_of(run):
for pipe in run["pipelines"]:
for stage in pipe["ci_stages"]:
for job in stage["jobs"]:
yield job
def get_dashboard_svgs(run, env, gnuplot):
stats_renderer = StatsGroupRenderer(run, env, gnuplot)
p_renderer = ParallelismGraphRenderer(run, env, gnuplot)
return {
"Runtime": stats_renderer.render(
lambda j: "duration" in j, "runtime-box.jinja.gnu"),
"Memory Usage": stats_renderer.render(
lambda j: j.get("memory_trace") and \
j["memory_trace"].get("peak") and \
j["memory_trace"]["peak"].get("rss"),
"memory-peak-box.jinja.gnu"),
"Parallelism": p_renderer.render("run-parallelism.jinja.gnu"),
}
def get_git_hash():
try:
cmd = ["git", "show", "--no-patch", "--pretty=format:%h"]
litani_dir = pathlib.Path(os.path.realpath(__file__)).parent
proc = subprocess.run(
cmd, text=True, stdout=subprocess.PIPE, check=True, cwd=litani_dir)
return proc.stdout.strip()
except RuntimeError:
return None
def get_summary(run):
ret = {
"in_progress": 0,
"success": 0,
"total": 0,
"fail": 0,
}
for pipe in run["pipelines"]:
ret["total"] += 1
ret[pipe["status"]] += 1
return ret
def render(run, report_dir, pipeline_depgraph_renderer):
temporary_report_dir = litani.get_report_data_dir() / str(uuid.uuid4())
temporary_report_dir.mkdir(parents=True)
old_report_dir_path = litani.get_report_dir().resolve()
old_report_dir = litani.ExpireableDirectory(old_report_dir_path)
artifact_dir = temporary_report_dir / "artifacts"
shutil.copytree(litani.get_artifacts_dir(), artifact_dir)
template_dir = pathlib.Path(__file__).parent.parent / "templates"
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(str(template_dir)),
autoescape=jinja2.select_autoescape(
enabled_extensions=('html'),
default_for_string=True))
render_artifact_indexes(artifact_dir, env)
gnuplot = Gnuplot()
svgs = get_dashboard_svgs(run, env, gnuplot)
litani_report_archive_path = os.getenv("LITANI_REPORT_ARCHIVE_PATH")
dash_templ = env.get_template("dashboard.jinja.html")
page = dash_templ.render(
run=run, svgs=svgs, litani_hash=get_git_hash(),
litani_version=litani.VERSION,
litani_report_archive_path=litani_report_archive_path,
summary=get_summary(run))
with litani.atomic_write(temporary_report_dir / "index.html") as handle:
print(page, file=handle)
with litani.atomic_write(temporary_report_dir / litani.RUN_FILE) as handle:
print(json.dumps(run, indent=2), file=handle)
pipe_templ = env.get_template("pipeline.jinja.html")
for pipe in run["pipelines"]:
pipeline_depgraph_renderer.render(
render_root=temporary_report_dir,
pipe_url=pathlib.Path(pipe["url"]), pipe=pipe)
for stage in pipe["ci_stages"]:
for job in stage["jobs"]:
if JobOutcomeTableRenderer.should_render(job):
JobOutcomeTableRenderer.render(
temporary_report_dir / pipe["url"], env, job)
if MemoryTraceRenderer.should_render(job):
MemoryTraceRenderer.render(
temporary_report_dir / pipe["url"], env, job, gnuplot)
pipe_page = pipe_templ.render(run=run, pipe=pipe)
with litani.atomic_write(
temporary_report_dir / pipe["url"] / "index.html") as handle:
print(pipe_page, file=handle)
temp_symlink_dir = report_dir.with_name(report_dir.name + str(uuid.uuid4()))
os.symlink(temporary_report_dir, temp_symlink_dir)
os.rename(temp_symlink_dir, report_dir)
# Release lock so that other processes can read from this directory
new_report_dir = litani.LockableDirectory(report_dir.resolve())
new_report_dir.release()
if old_report_dir_path.exists():
old_report_dir.expire()
litani.unlink_expired()
def render_artifact_indexes(artifact_dir, env):
def dirs_needing_indexes():
for root, dirs, fyles in os.walk(artifact_dir):
if "index.html" not in fyles:
yield pathlib.Path(root), dirs, fyles
index_templ = env.get_template("file-list.jinja.html")
for dyr, dirs, files in dirs_needing_indexes():
page = index_templ.render(
file_list=sorted(files), dir_list=sorted(dirs), root=str(dyr))
with litani.atomic_write(dyr / "index.html") as handle:
print(page, file=handle)