# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import dataclasses
import os
import pathlib
import re
import textwrap
import lib.litani
import lib.litani_report
class Node:
@staticmethod
def escape(string):
for match, repl in [(
'"', '\\"'
), (
';', '\\;'
)]:
string = re.sub(match, repl, string)
return string
def html_escape(string):
for match, repl in [(
'&', '&'
), (
'"', '"'
), (
'<', '<'
), (
'>', '>'
)]:
string = re.sub(match, repl, string)
return string
class DependencyNode(Node):
def __init__(self, fyle, line_width=40, **style):
self.file = fyle
self.id = hash(fyle)
path = pathlib.Path(fyle)
_, ext = os.path.splitext(path.name)
self.style = style
path_name = "\n".join(textwrap.wrap(path.name, width=line_width))
self.style["label"] = f"{path_name}{ext}"
def __hash__(self):
return self.id
def __eq__(self, other):
return self.id == other.id
def __str__(self):
return '"{id}" [{style}];'.format(
id=self.id, style=",".join([
f'"{key}"="{Node.escape(value)}"'
for key, value in self.style.items()]))
class CommandNode(Node):
def __init__(
self, pipeline_name, description, command, line_width=40, **style):
self.id = hash(command)
self.pipeline_name = '
'.join(
textwrap.wrap(Node.html_escape(pipeline_name), width=line_width))
if description:
self.description = '
'.join(
textwrap.wrap(Node.html_escape(description), width=line_width))
else:
self.description = ""
self.command = '
'.join(
textwrap.wrap(Node.html_escape(command), width=line_width))
self.style = style
self.style["shape"] = "plain"
def __hash__(self):
return self.id
def __eq__(self, other):
return self.id == other.id
def __str__(self):
if self.description:
desc_cell = f"\n
{self.description} | "
else:
desc_cell = ""
return '''"{id}" [label=<
{pipeline_name} | {desc_cell}
{command} |
> {style}];'''.format(
id=self.id, command=self.command,
desc_cell=desc_cell,
pipeline_name=self.pipeline_name,
style=",".join([
f'{key}="{Node.escape(value)}"'
for key, value in self.style.items()]))
class Edge:
def __init__(self, src, dst, **style):
self.src = src
self.dst = dst
self.style = style
def __eq__(self, other):
return all((self.src == other.src, self.dst == other.dst))
def __hash__(self):
return hash(self.src) + hash(self.dst)
def __str__(self):
return '"{src}" -> "{dst}" [{style}];'.format(
src=self.src.id, dst=self.dst.id,
style=",".join([
f'"{key}"="{value}"' for key, value in self.style.items()]))
@dataclasses.dataclass
class SinglePipelineGraph:
pipe: dict
nodes: set = dataclasses.field(default_factory=set)
edges: set = dataclasses.field(default_factory=set)
def iter_jobs(self):
for stage in self.pipe["ci_stages"]:
for job in stage["jobs"]:
yield job
def build(self):
for job in self.iter_jobs():
args = job["wrapper_arguments"]
cmd_node = self._make_cmd_node(
job["complete"], job.get("outcome", None),
args["pipeline_name"], args["description"], args["command"])
self.nodes.add(cmd_node)
for inputt in args.get("inputs") or []:
in_node = lib.graph.DependencyNode(inputt)
self.nodes.add(in_node)
self.edges.add(lib.graph.Edge(src=in_node, dst=cmd_node))
for output in args.get("outputs") or []:
out_node = lib.graph.DependencyNode(output)
self.nodes.add(out_node)
self.edges.add(lib.graph.Edge(src=cmd_node, dst=out_node))
@staticmethod
def _make_cmd_node(complete, outcome, pipeline_name, description, command):
cmd_style = {"style": "filled"}
if complete and outcome == "success":
cmd_style["fillcolor"] = "#90caf9"
elif complete and outcome == "fail_ignored":
cmd_style["fillcolor"] = "#ffecb3"
elif complete and outcome == "fail":
cmd_style["fillcolor"] = "#ef9a9a"
elif complete:
raise RuntimeError("Unknown outcome '%s'" % outcome)
else:
cmd_style["fillcolor"] = "#eceff1"
return lib.graph.CommandNode(
pipeline_name, description, command, **cmd_style)
def as_dot(self):
buf = ["digraph G {"]
buf.append('bgcolor="transparent"')
buf.extend([(" %s" % str(n)) for n in self.nodes])
buf.extend([(" %s" % str(e)) for e in self.edges])
buf.append("}")
return "\n".join(buf)
@staticmethod
def render(pipe):
spg = SinglePipelineGraph(pipe)
spg.build()
return spg.as_dot()
@dataclasses.dataclass
class PipelineChooser:
pipelines: list
def should_skip(self, pipeline):
return self.pipelines and not pipeline in self.pipelines
@dataclasses.dataclass
class Graph:
run: dict
pipeline_chooser: PipelineChooser
def iter_jobs(self):
for pipe in self.run["pipelines"]:
if self.pipeline_chooser.should_skip(pipe["name"]):
continue
for stage in pipe["ci_stages"]:
for job in stage["jobs"]:
yield job
def __str__(self):
buf = ["digraph G {"]
nodes = set()
edges = set()
for job in self.iter_jobs():
args = job["wrapper_arguments"]
cmd_node = CommandNode(
args["pipeline_name"], args["description"], args["command"])
nodes.add(cmd_node)
if args["outputs"]:
for output in args["outputs"]:
out_node = DependencyNode(output)
nodes.add(out_node)
edges.add(Edge(src=cmd_node, dst=out_node))
if args["inputs"]:
for inputt in args["inputs"]:
in_node = DependencyNode(inputt)
nodes.add(in_node)
edges.add(Edge(src=in_node, dst=cmd_node))
buf.extend([(" %s" % str(n)) for n in nodes])
buf.extend([(" %s" % str(e)) for e in edges])
buf.append("}")
return "\n".join(buf)
async def print_graph(args):
lib.litani.add_jobs_to_cache()
run = lib.litani_report.get_run_data(lib.litani.get_cache_dir())
pc = PipelineChooser(args.pipelines)
graph = Graph(run=run, pipeline_chooser=pc)
print(graph)