# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://www.apache.org/licenses/LICENSE-2.0 # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. import dataclasses import os import pathlib import re import textwrap import lib.litani import lib.litani_report class Node: @staticmethod def escape(string): for match, repl in [( '"', '\\"' ), ( ';', '\\;' )]: string = re.sub(match, repl, string) return string def html_escape(string): for match, repl in [( '&', '&' ), ( '"', '"' ), ( '<', '<' ), ( '>', '>' )]: string = re.sub(match, repl, string) return string class DependencyNode(Node): def __init__(self, fyle, line_width=40, **style): self.file = fyle self.id = hash(fyle) path = pathlib.Path(fyle) _, ext = os.path.splitext(path.name) self.style = style path_name = "\n".join(textwrap.wrap(path.name, width=line_width)) self.style["label"] = f"{path_name}{ext}" def __hash__(self): return self.id def __eq__(self, other): return self.id == other.id def __str__(self): return '"{id}" [{style}];'.format( id=self.id, style=",".join([ f'"{key}"="{Node.escape(value)}"' for key, value in self.style.items()])) class CommandNode(Node): def __init__( self, pipeline_name, description, command, line_width=40, **style): self.id = hash(command) self.pipeline_name = '
'.join( textwrap.wrap(Node.html_escape(pipeline_name), width=line_width)) if description: self.description = '
'.join( textwrap.wrap(Node.html_escape(description), width=line_width)) else: self.description = "" self.command = '
'.join( textwrap.wrap(Node.html_escape(command), width=line_width)) self.style = style self.style["shape"] = "plain" def __hash__(self): return self.id def __eq__(self, other): return self.id == other.id def __str__(self): if self.description: desc_cell = f"\n{self.description}" else: desc_cell = "" return '''"{id}" [label=< {desc_cell}
{pipeline_name}
{command}
> {style}];'''.format( id=self.id, command=self.command, desc_cell=desc_cell, pipeline_name=self.pipeline_name, style=",".join([ f'{key}="{Node.escape(value)}"' for key, value in self.style.items()])) class Edge: def __init__(self, src, dst, **style): self.src = src self.dst = dst self.style = style def __eq__(self, other): return all((self.src == other.src, self.dst == other.dst)) def __hash__(self): return hash(self.src) + hash(self.dst) def __str__(self): return '"{src}" -> "{dst}" [{style}];'.format( src=self.src.id, dst=self.dst.id, style=",".join([ f'"{key}"="{value}"' for key, value in self.style.items()])) @dataclasses.dataclass class SinglePipelineGraph: pipe: dict nodes: set = dataclasses.field(default_factory=set) edges: set = dataclasses.field(default_factory=set) def iter_jobs(self): for stage in self.pipe["ci_stages"]: for job in stage["jobs"]: yield job def build(self): for job in self.iter_jobs(): args = job["wrapper_arguments"] cmd_node = self._make_cmd_node( job["complete"], job.get("outcome", None), args["pipeline_name"], args["description"], args["command"]) self.nodes.add(cmd_node) for inputt in args.get("inputs") or []: in_node = lib.graph.DependencyNode(inputt) self.nodes.add(in_node) self.edges.add(lib.graph.Edge(src=in_node, dst=cmd_node)) for output in args.get("outputs") or []: out_node = lib.graph.DependencyNode(output) self.nodes.add(out_node) self.edges.add(lib.graph.Edge(src=cmd_node, dst=out_node)) @staticmethod def _make_cmd_node(complete, outcome, pipeline_name, description, command): cmd_style = {"style": "filled"} if complete and outcome == "success": cmd_style["fillcolor"] = "#90caf9" elif complete and outcome == "fail_ignored": cmd_style["fillcolor"] = "#ffecb3" elif complete and outcome == "fail": cmd_style["fillcolor"] = "#ef9a9a" elif complete: raise RuntimeError("Unknown outcome '%s'" % outcome) else: cmd_style["fillcolor"] = "#eceff1" return lib.graph.CommandNode( pipeline_name, description, command, **cmd_style) def as_dot(self): buf = ["digraph G {"] buf.append('bgcolor="transparent"') buf.extend([(" %s" % str(n)) for n in self.nodes]) buf.extend([(" %s" % str(e)) for e in self.edges]) buf.append("}") return "\n".join(buf) @staticmethod def render(pipe): spg = SinglePipelineGraph(pipe) spg.build() return spg.as_dot() @dataclasses.dataclass class PipelineChooser: pipelines: list def should_skip(self, pipeline): return self.pipelines and not pipeline in self.pipelines @dataclasses.dataclass class Graph: run: dict pipeline_chooser: PipelineChooser def iter_jobs(self): for pipe in self.run["pipelines"]: if self.pipeline_chooser.should_skip(pipe["name"]): continue for stage in pipe["ci_stages"]: for job in stage["jobs"]: yield job def __str__(self): buf = ["digraph G {"] nodes = set() edges = set() for job in self.iter_jobs(): args = job["wrapper_arguments"] cmd_node = CommandNode( args["pipeline_name"], args["description"], args["command"]) nodes.add(cmd_node) if args["outputs"]: for output in args["outputs"]: out_node = DependencyNode(output) nodes.add(out_node) edges.add(Edge(src=cmd_node, dst=out_node)) if args["inputs"]: for inputt in args["inputs"]: in_node = DependencyNode(inputt) nodes.add(in_node) edges.add(Edge(src=in_node, dst=cmd_node)) buf.extend([(" %s" % str(n)) for n in nodes]) buf.extend([(" %s" % str(e)) for e in edges]) buf.append("}") return "\n".join(buf) async def print_graph(args): lib.litani.add_jobs_to_cache() run = lib.litani_report.get_run_data(lib.litani.get_cache_dir()) pc = PipelineChooser(args.pipelines) graph = Graph(run=run, pipeline_chooser=pc) print(graph)