2021-11-30 14:51:24 +01:00

518 lines
17 KiB
Python

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# A copy of the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
import datetime
import logging
import lib.litani
_NO_VALIDATE = False
def _ms_time_str(string):
try:
datetime.datetime.strptime(string, lib.litani.TIME_FORMAT_MS)
except RuntimeError as e:
raise ValueError(
"Date '%s' was not in the right format (expected '%s')" %
(string, lib.litani.TIME_FORMAT_MS)) from e
def _time_str(string):
try:
datetime.datetime.strptime(string, lib.litani.TIME_FORMAT_R)
except RuntimeError as e:
raise ValueError(
"Date '%s' was not in the right format (expected '%s')" %
(string, lib.litani.TIME_FORMAT_R)) from e
# doc-gen
# {
# "page": "litani-run.json",
# "order": 1,
# "title": "Schema for 'wrapper_arguments' key"
# }
def _single_job_schema():
import voluptuous
# The *wrapper_arguments* key to run.json maps to the following dict. None
# of the values in this dict change at any point during the run; they are
# mostly the same as the flags passed to *litani-add-job(1)* for this job.
return {
"job_id": str,
# A globally-unique ID for this job.
"command": str,
# The command that litani will execute in a subshell.
"ci_stage": str,
# The name of the 'stage' that this job will execute in, used for
# organizing the HTML dashboard.
"verbose": bool,
"timeout_ok": bool,
# If true, then if this job times out, the outcome will be set to
# 'success'.
"pipeline_name": str,
# The name of the 'pipeline' that this job will execute in, used for
# organizing the HTML dashboard.
"very_verbose": bool,
"timeout_ignore": bool,
# If true, then if this job times out, the outcome will be set to
# 'fail_ignored'.
"profile_memory": bool,
# If true, then litani will regularly sample the memory usage of this
# job's command while it runs. Samples are stored in the job's
# *memory_trace*.
"profile_memory_interval": int,
# How frequently (in seconds) litani will profile the command's memory
# use, if *profile_memory* is true.
"cwd": voluptuous.Any(str, None),
# The directory that litani will run the command in.
"interleave_stdout_stderr": bool,
# Whether the command's stderr will be sent to the stdout stream. If
# true, the job's *stderr* key will be None and the *stdout* key will
# contain lines from both the command's stdout and stderr.
"pool": voluptuous.Any(str, None),
# The pool that this job will execute in; if not null, then it must be a
# key in the *pools* dict of the overall run.
"tags": voluptuous.Any([str], None),
# A list of user-specified tags. Litani mostly doesn't interpret these,
# although the HTML dashboard generator does use some of them. Tags are
# intended to help users find particular jobs for data analysis and can
# contain arbitrary data.
"timeout": voluptuous.Any(int, None),
# The number of seconds that Litani will allow the job to run for before
# sending SIGTERM followed by SIGKILL (see *signal(3)*).
"inputs": voluptuous.Any([str], None),
# The list of files that should be made up-to-date before the job will
# run
"outputs": voluptuous.Any([str], None),
# The list of files that this job will make up-to-date after it
# completes
"description": voluptuous.Any(str, None),
# A human-readable description of this job
"status_file": voluptuous.Any(str, None),
"stderr_file": voluptuous.Any(str, None),
# A file to redirect stderr to, as well as buffering it internally
"stdout_file": voluptuous.Any(str, None),
# A file to redirect stdout to, as well as buffering it internally
"ok_returns": voluptuous.Any([str], None),
# A list of return codes. If the command exits with any of these return
# codes (or 0), then the outcome will be set to 'success'.
"outcome_table": voluptuous.Any(str, None),
# A file to load an outcome table from.
"phony_outputs": voluptuous.Any([str], None),
# A list of outputs that Litani will not warn about if they were not
# created by the job.
"ignore_returns": voluptuous.Any([str], None),
# A list of return codes. If the command exits with any of these return
# codes (or 0), then the outcome will be set to 'fail_ignored'.
"subcommand": voluptuous.Any("exec", "add-job"),
}
def validate_single_job(job):
global _NO_VALIDATE
if _NO_VALIDATE:
return
try:
import voluptuous
import voluptuous.humanize
schema = voluptuous.Schema(_single_job_schema(), required=True)
voluptuous.humanize.validate_with_humanized_errors(job, schema)
except (ImportError, ModuleNotFoundError):
logging.debug("voluptuous not installed; not validating schema")
_NO_VALIDATE = True
def validate_run(run):
global _NO_VALIDATE
if _NO_VALIDATE:
return
try:
import voluptuous
import voluptuous.humanize
outcome = _outcome()
schema = voluptuous.Schema(_run_schema(), required=True)
voluptuous.humanize.validate_with_humanized_errors(run, schema)
except (ImportError, ModuleNotFoundError):
logging.debug("voluptuous not installed; not validating schema")
_NO_VALIDATE = True
# doc-gen
# {
# "page": "litani-run.json",
# "order": 3,
# "title": "Schema for a job or ci_stage outcome"
# }
def _outcome():
import voluptuous
# Outcomes and ci_stages have an *"outcome"* (though, confusingly, the key
# is *"status"* for ci_stages). "fail_ignored" means that the job failed but
# the user specified that the job's dependencies should run anyway. If a
# pipeline contains a job whose outcome is "fail_ignored", then the status
# of the pipeline will be "fail" after all of its jobs complete.
return voluptuous.Any("success", "fail", "fail_ignored")
# end-doc-gen
# doc-gen
# {
# "page": "litani-run.json",
# "order": 2,
# "title": "Schema for a pipeline or run status"
# }
def _status():
import voluptuous
# pipelines and runs have a *"status"*. The status is "in_progress" when some
# of the jobs are incomplete and either "success" or "fail" once all jobs
# complete.
return voluptuous.Any("success", "fail", "in_progress")
# end-doc-gen
# doc-gen
# {
# "page": "litani-run.json",
# "order": 0,
# "title": "Schema for entire run.json file"
# }
def _run_schema():
import voluptuous
return {
"run_id": str,
# A globally-unique ID for the run.
"project": str,
# A name for the project that this run is part of. This name is used by
# the HTML report generator and can be used to group related sets of
# runs, but is otherwise not used by litani.
"stages": [str],
# The CI stages that each job can be a member of. Stage names can
# be provided through the --stages flag of *litani-init(1)*. Default
# stages "build", "test" and "report" are used if the flag is not used.
"pools": {voluptuous.Optional(str): int},
# A mapping from pool names to the depth of the pool. Jobs can be a
# member of zero or one pool. The depth of a pool that a set of jobs
# belong to limits the number of those jobs that litani will run in
# parallel.
"start_time": _time_str,
# The time at which the run started.
"version": str,
# The version string of the Litani binary that ran this run.
"version_major": int,
# Litani's major version number.
"version_minor": int,
# Litani's minor version number.
"version_patch": int,
# Litani's patch version number.
"release_candidate": bool,
# false if this version of Litani is a tagged release.
voluptuous.Optional("end_time"): _time_str,
# The time at which the run ended. This key will only exist if *status*
# is not equal to "in_progress".
"status": _status(),
# The state of this run, see the status schema below.
"aux": dict,
# A free-form dict that users can add custom information into. There are
# no constraints on the format of this dict, but it is recommended that
# users add their information to a sub-dict with a key that indicates
# its function. For example, to add information pertaining to a CI run,
# users might add a key called "continuous_integration_data" whose value
# is a sub-dict containing all required fields.
"parallelism": voluptuous.Any({
# This dict contains information about the parallelism level of the jobs
# that litani runs. This is to measure whether the run is using as many
# processor cores as possible over the duration of the run.
voluptuous.Optional("trace"): [{
# A list of samples of the run's concurrency level.
"time": _ms_time_str,
# The time at which the sample was taken.
"finished": int,
# How many jobs have finished
"running": int,
# How many jobs are running
"total": int,
# The total number of jobs
}],
voluptuous.Optional("max_parallelism"): int,
# The maximum parallelism attained over the run
voluptuous.Optional("n_proc"): voluptuous.Any(None, int),
# The number of processors detected on this machine
}),
"pipelines": [{
# Each pipeline contains ci_stages which contain jobs.
"url": str,
"name": str,
# The pipeline name. The set of pipeline names are all the names
# passed to the --pipeline-name flag of *litani-add-job(1)*.
"status": _status(),
# The pipeline's state, see the status schema below.
"ci_stages": [{
# Each ci_stage contains a list of jobs.
"url": str,
"complete": bool,
# Whether all the jobs in this stage are complete.
"name": str,
# The stage's name. This is any of the *stages* of
# the project.
"status": _outcome(),
# The stage's state, see the outcome schema below.
"progress": voluptuous.All(int, voluptuous.Range(min=0, max=100)),
"jobs": [voluptuous.Any({
# The list of all the jobs in this ci_stage in this pipeline.
# There are three different forms the value of this key can
# take.
"complete": False,
# If *complete* is false and no *start_time* key exists,
# then this job has not yet started.
"duration_str": None,
"wrapper_arguments": _single_job_schema(),
# The arguments passed to this job, see the
# single_job_schema schema below.
}, {
"complete": False,
# If *complete* is false but the *start_time* key exists,
# then the job has started running but has not yet finished.
"start_time": _time_str,
# The time at which the job started running.
"duration_str": None,
"wrapper_arguments": _single_job_schema(),
# The arguments passed to this job, see the
# single_job_schema schema below.
}, {
"duration": int,
# How long the job ran for.
"complete": True,
# If *complete* is true, then the job has terminated.
"outcome": _outcome(),
# The job's outcome, see the outcome schema below.
"end_time": _time_str,
# The time at which the job completed.
"start_time": _time_str,
# The time at which the job started running.
"timeout_reached": bool,
# Whether the job reached its timeout limit.
"command_return_code": int,
# The command's return code.
"wrapper_return_code": int,
"stderr": voluptuous.Any([str], None),
# A list of strings that the command printed to its stderr.
"stdout": voluptuous.Any([str], None),
# A list of strings that the command printed to its stdout.
"duration_str": voluptuous.Any(str, None),
# A human-readable duration of this job (HH:MM:SS).
"wrapper_arguments": _single_job_schema(),
# The arguments passed to this job, see the
# single_job_schema schema below.
"loaded_outcome_dict": voluptuous.Any(dict, None),
# If *wrapper_arguments["outcome_table"]* is not null, the
# value of this key will be the deserialized data loaded
# from the outcome table file.
"memory_trace": {
# If *profile_memory* was set to true in the wrapper
# arguments for this job, this dict will contain samples of
# the command's memory usage.
voluptuous.Optional("peak"): {
# The command's peak memory usage.
"rss": int,
# Peak resident set
"vsz": int,
# Peak virtual memory size
"human_readable_rss": str,
# Peak resident set
"human_readable_vsz": str,
# Peak virtual memory size
},
voluptuous.Optional("trace"): [{
# A list of samples of memory usage.
"rss": int,
# Resident set
"vsz": int,
# Virtual memory
"time": _time_str,
# The time at which the sample was taken
}],
},
})],
}],
}],
"latest_symlink": voluptuous.Any(str, None),
# The symbolic link to the report advertised to users
}
# end-doc-gen
def validate_outcome_table(table):
try:
import voluptuous
except ImportError:
logging.debug("Skipping outcome table validation as voluptuous is not installed")
return
schema = voluptuous.Schema(_outcome_table_schema())
voluptuous.humanize.validate_with_humanized_errors(table, schema)
# doc-gen
# {
# "page": "litani-outcome-table.json",
# "order": 0,
# "title": "Schema for user-provided outcome table"
# }
def _outcome_table_schema():
import voluptuous
return {
voluptuous.Optional("comment"): str,
# A description of the outcome table as a whole.
"outcomes": [
# The outcome of the job will be the first item in this list that
# matches.
voluptuous.Any({
"type": "return-code",
# If the return code of the job matches the value of *value*,
# the outcome will be set to the value of *action*. The value
# of the optional *comment* key can contain a human-readable
# explanation for this outcome.
"value": int,
"action": _outcome(),
voluptuous.Optional("comment"): str,
}, {
"type": "timeout",
# If this job timed out, the outcome will be set to the value of
# *action*. The value of the optional *comment* key can contain
# a human-readable explanation for this outcome.
"action": _outcome(),
voluptuous.Optional("comment"): str,
}, {
"type": "wildcard",
# The *"wildcard"* action type matches any job and sets its
# outcome to the value of *action*. It is recommended to place a
# *wildcard* action as the last element of the list of
# *outcomes* to catch all jobs that were not matched by a
# previous rule.
"action": _outcome(),
voluptuous.Optional("comment"): str,
})]
}