2021-11-30 14:51:24 +01:00

195 lines
5.6 KiB
Python

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import dataclasses
import datetime
import io
import math
import os
import pathlib
import re
import subprocess
import threading
import lib.litani
@dataclasses.dataclass
class _StatusParser:
# Format strings documented here:
# https://ninja-build.org/manual.html#_environment_variables
status_format: str = "<ninja>:%r/%f/%t "
status_re: re.Pattern = re.compile(
r"<ninja>:(?P<running>\d+)/(?P<finished>\d+)/(?P<total>\d+) "
r"(?P<message>.+)")
def parse_status(self, status_str):
m = self.status_re.match(status_str)
if not m:
return None
ret = {k: int(m[k]) for k in ["running", "finished", "total"]}
return {**ret, **{"message": m["message"]}}
@dataclasses.dataclass
class _OutputAccumulator:
out_stream: io.RawIOBase
status_parser: _StatusParser
concurrency_graph: list = dataclasses.field(default_factory=list)
finished: int = None
total: int = None
thread: threading.Thread = None
@staticmethod
def get_tty_width():
try:
proc = subprocess.run(
["tput", "cols"], text=True, stdout=subprocess.PIPE, check=True)
except (FileNotFoundError, subprocess.CalledProcessError):
return None
if proc.returncode:
return None
try:
return int(proc.stdout.strip())
except TypeError:
return None
def print_progress(self, message):
tty_width = self.get_tty_width()
if not tty_width:
message_fmt = message
else:
f_width = int(math.log10(self.finished)) + 1 if self.finished else 1
t_width = int(math.log10(self.total)) + 1 if self.total else 1
progress_width = f_width + t_width + len("[/] ")
if len(message) + progress_width <= tty_width:
message_fmt = "[%d/%d] %s%s" % (
self.finished, self.total, message,
" " * (tty_width - len(message) - progress_width))
else:
message_width = tty_width - progress_width - len("...")
message_fmt = "[%d/%d] %s..." % (
self.finished, self.total, message[:message_width])
print("\r%s" % message_fmt, end="")
def process_output(self):
while True:
try:
line = self.out_stream.readline()
if not line:
return
except ValueError:
return
status = self.status_parser.parse_status(line)
if not status:
print(line)
continue
now = datetime.datetime.now(datetime.timezone.utc)
now_str = now.strftime(lib.litani.TIME_FORMAT_MS)
status["time"] = now_str
self.concurrency_graph.append(status)
if any((
status["finished"] != self.finished,
status["total"] != self.total)):
self.finished = status["finished"]
self.total = status["total"]
self.print_progress(status["message"])
def join(self):
self.thread.join()
print()
def start(self):
self.thread = threading.Thread(target=self.process_output)
self.thread.start()
@dataclasses.dataclass
class Runner:
ninja_file: pathlib.Path
dry_run: bool
parallelism: int
pipelines: list
ci_stage: str
proc: subprocess.CompletedProcess = None
status_parser: _StatusParser = _StatusParser()
out_acc: _OutputAccumulator = None
def _get_cmd(self):
cmd = [
"ninja",
"-k", "0",
"-f", self.ninja_file,
]
if self.parallelism:
cmd.extend(["-j", self.parallelism])
if self.dry_run:
cmd.append("-n")
if self.pipelines:
targets = ["__litani_pipeline_name_%s" % p for p in self.pipelines]
cmd.extend(targets)
elif self.ci_stage:
targets = ["__litani_ci_stage_%s" % p for p in self.ci_stage]
cmd.extend(targets)
return [str(c) for c in cmd]
def run(self):
env = {
**os.environ,
**{"NINJA_STATUS": self.status_parser.status_format},
}
with subprocess.Popen(
self._get_cmd(), env=env, stdout=subprocess.PIPE, text=True,
) as proc:
self.proc = proc
self.out_acc = _OutputAccumulator(proc.stdout, self.status_parser)
self.out_acc.start()
self.out_acc.join()
def was_successful(self):
return not self.proc.returncode
def get_parallelism_graph(self):
trace = []
for item in self.out_acc.concurrency_graph:
tmp = dict(item)
tmp.pop("message")
trace.append(tmp)
return {
"trace": trace,
"max_parallelism": max(
[i["running"] for i in self.out_acc.concurrency_graph]),
"n_proc": os.cpu_count(),
}