# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Run a projects tests and load them into stestr."""
import os
import subprocess
import sys
import warnings
from cliff import command
import six
import subunit
import testtools
from stestr import bisect_tests
from stestr.commands import load
from stestr.commands import slowest
from stestr import config_file
from stestr import output
from stestr.repository import abstract as repository
from stestr.repository import util
from stestr.testlist import parse_list
from stestr import user_config
[docs]class Run(command.Command):
[docs] def get_parser(self, prog_name):
parser = super(Run, self).get_parser(prog_name)
parser.add_argument("filters", nargs="*", default=None,
help="A list of string regex filters to initially "
"apply on the test list. Tests that match any of "
"the regexes will be used. (assuming any other "
"filtering specified also uses it)")
parser.add_argument("--failing", action="store_true",
default=False,
help="Run only tests known to be failing.")
parser.add_argument("--serial", action="store_true",
default=False,
help="Run tests in a serial process.")
parser.add_argument("--concurrency", action="store", default=None,
type=int,
help="How many processes to use. The default (0) "
"autodetects your CPU count.")
parser.add_argument("--load-list", default=None,
help="Only run tests listed in the named file."),
parser.add_argument("--partial", action="store_true", default=False,
help="DEPRECATED: Only some tests will be run. "
"Implied by --failing. This option is deprecated "
"and no longer does anything. It will be removed "
"in the future")
parser.add_argument("--subunit", action="store_true", default=False,
help="Display results in subunit format.")
parser.add_argument("--until-failure", action="store_true",
default=False,
help="Repeat the run again and again until "
"failure occurs.")
parser.add_argument("--analyze-isolation", action="store_true",
default=False,
help="Search the last test run for 2-test test "
"isolation interactions.")
parser.add_argument("--isolated", action="store_true",
default=False,
help="Run each test id in a separate test runner.")
parser.add_argument("--worker-file", action="store", default=None,
dest='worker_path',
help="Optional path of a manual worker grouping "
"file to use for the run")
parser.add_argument('--blacklist-file', '-b',
default=None, dest='blacklist_file',
help='Path to a blacklist file, this file '
'contains a separate regex exclude on each '
'newline')
parser.add_argument('--whitelist-file', '-w',
default=None, dest='whitelist_file',
help='Path to a whitelist file, this file '
'contains a separate regex on each newline.')
parser.add_argument('--black-regex', '-B', default=None,
dest='black_regex',
help='Test rejection regex. If a test cases name '
'matches on re.search() operation , '
'it will be removed from the final test list. '
'Effectively the black-regexp is added to '
' black regexp list, but you do need to edit a '
'file. The black filtering happens after the '
'initial white selection, which by default is '
'everything.')
parser.add_argument('--no-discover', '-n', default=None,
metavar='TEST_ID',
help="Takes in a single test to bypasses test "
"discover and just execute the test specified. A "
"file may be used in place of a test name.")
parser.add_argument('--random', '-r', action="store_true",
default=False,
help="Randomize the test order after they are "
"partitioned into separate workers")
parser.add_argument('--combine', action='store_true', default=False,
help="Combine the results from the test run with "
"the last run in the repository")
parser.add_argument('--no-subunit-trace', action='store_true',
default=False,
help='Disable the default subunit-trace output '
'filter')
parser.add_argument('--force-subunit-trace', action='store_true',
default=False,
help='Force subunit-trace output regardless of any'
'other options or config settings')
parser.add_argument('--color', action='store_true', default=False,
help='Enable color output in the subunit-trace '
'output, if subunit-trace output is enabled. '
'(this is the default). If subunit-trace is '
'disable this does nothing.')
parser.add_argument('--slowest', action='store_true', default=False,
help='After the test run, print the slowest '
'tests.')
parser.add_argument('--abbreviate', action='store_true',
dest='abbreviate',
help='Print one character status for each test')
parser.add_argument('--suppress-attachments', action='store_true',
dest='suppress_attachments',
help='If set do not print stdout or stderr '
'attachment contents on a successful test '
'execution')
return parser
[docs] def get_description(self):
help_str = "Run the tests for a project and load them into a "
"repository."
return help_str
[docs] def take_action(self, parsed_args):
user_conf = user_config.get_user_config(self.app_args.user_config)
filters = parsed_args.filters
args = parsed_args
if getattr(user_conf, 'run', False):
if not user_conf.run.get('no-subunit-trace'):
if not args.no_subunit_trace:
pretty_out = True
else:
pretty_out = False
else:
pretty_out = False
pretty_out = args.force_subunit_trace or pretty_out
if args.concurrency is None:
concurrency = user_conf.run.get('concurrency', 0)
else:
concurrency = args.concurrency
random = args.random or user_conf.run.get('random', False)
color = args.color or user_conf.run.get('color', False)
abbreviate = args.abbreviate or user_conf.run.get(
'abbreviate', False)
suppress_attachments = (
args.suppress_attachments or user_conf.run.get(
'suppress-attachments', False))
else:
pretty_out = args.force_subunit_trace or not args.no_subunit_trace
concurrency = args.concurrency or 0
random = args.random
color = args.color
abbreviate = args.abbreviate
suppress_attachments = args.suppress_attachments
verbose_level = self.app.options.verbose_level
stdout = open(os.devnull, 'w') if verbose_level == 0 else sys.stdout
result = run_command(
config=self.app_args.config, repo_type=self.app_args.repo_type,
repo_url=self.app_args.repo_url,
test_path=self.app_args.test_path, top_dir=self.app_args.top_dir,
group_regex=self.app_args.group_regex, failing=args.failing,
serial=args.serial, concurrency=concurrency,
load_list=args.load_list, partial=args.partial,
subunit_out=args.subunit, until_failure=args.until_failure,
analyze_isolation=args.analyze_isolation, isolated=args.isolated,
worker_path=args.worker_path, blacklist_file=args.blacklist_file,
whitelist_file=args.whitelist_file, black_regex=args.black_regex,
no_discover=args.no_discover, random=random,
combine=args.combine,
filters=filters, pretty_out=pretty_out, color=color,
stdout=stdout, abbreviate=abbreviate,
suppress_attachments=suppress_attachments)
# Always output slowest test info if requested, regardless of other
# test run options
user_slowest = False
if getattr(user_conf, 'run', False):
user_slowest = user_conf.run.get('slowest', False)
if args.slowest or user_slowest:
slowest.slowest(repo_type=self.app_args.repo_type,
repo_url=self.app_args.repo_url)
return result
def _find_failing(repo):
run = repo.get_failing()
case = run.get_test()
ids = []
def gather_errors(test_dict):
if test_dict['status'] == 'fail':
ids.append(test_dict['id'])
result = testtools.StreamToDict(gather_errors)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
return ids
[docs]def run_command(config='.stestr.conf', repo_type='file',
repo_url=None, test_path=None, top_dir=None, group_regex=None,
failing=False, serial=False, concurrency=0, load_list=None,
partial=False, subunit_out=False, until_failure=False,
analyze_isolation=False, isolated=False, worker_path=None,
blacklist_file=None, whitelist_file=None, black_regex=None,
no_discover=False, random=False, combine=False, filters=None,
pretty_out=True, color=False, stdout=sys.stdout,
abbreviate=False, suppress_attachments=False):
"""Function to execute the run command
This function implements the run command. It will run the tests specified
in the parameters based on the provided config file and/or arguments
specified in the way specified by the arguments. The results will be
printed to STDOUT and loaded into the repository.
:param str config: The path to the stestr config file. Must be a string.
:param str repo_type: This is the type of repository to use. Valid choices
are 'file' and 'sql'.
:param str repo_url: The url of the repository to use.
:param str test_path: Set the test path to use for unittest discovery.
If both this and the corresponding config file option are set, this
value will be used.
:param str top_dir: The top dir to use for unittest discovery. This takes
precedence over the value in the config file. (if one is present in
the config file)
:param str group_regex: Set a group regex to use for grouping tests
together in the stestr scheduler. If both this and the corresponding
config file option are set this value will be used.
:param bool failing: Run only tests known to be failing.
:param bool serial: Run tests serially
:param int concurrency: "How many processes to use. The default (0)
autodetects your CPU count and uses that.
:param str load_list: The path to a list of test_ids. If specified only
tests listed in the named file will be run.
:param bool partial: DEPRECATED: Only some tests will be run. Implied by
`--failing`. This flag is deprecated because and doesn't do anything
it will be removed in a future release.
:param bool subunit_out: Display results in subunit format.
:param bool until_failure: Repeat the run again and again until failure
occurs.
:param bool analyze_isolation: Search the last test run for 2-test test
isolation interactions.
:param bool isolated: Run each test id in a separate test runner.
:param str worker_path: Optional path of a manual worker grouping file
to use for the run.
:param str blacklist_file: Path to a blacklist file, this file contains a
separate regex exclude on each newline.
:param str whitelist_file: Path to a whitelist file, this file contains a
separate regex on each newline.
:param str black_regex: Test rejection regex. If a test cases name matches
on re.search() operation, it will be removed from the final test list.
:param str no_discover: Takes in a single test_id to bypasses test
discover and just execute the test specified. A file name may be used
in place of a test name.
:param bool random: Randomize the test order after they are partitioned
into separate workers
:param bool combine: Combine the results from the test run with the
last run in the repository
:param list filters: A list of string regex filters to initially apply on
the test list. Tests that match any of the regexes will be used.
(assuming any other filtering specified also uses it)
:param bool pretty_out: Use the subunit-trace output filter
:param bool color: Enable colorized output in subunit-trace
:param file stdout: The file object to write all output to. By default this
is sys.stdout
:param bool abbreviate: Use abbreviated output if set true
:param bool suppress_attachments: When set true attachments subunit_trace
will not print attachments on successful test execution.
:return return_code: The exit code for the command. 0 for success and > 0
for failures.
:rtype: int
"""
if partial:
warnings.warn('The partial flag is deprecated and has no effect '
'anymore')
try:
repo = util.get_repo_open(repo_type, repo_url)
# If a repo is not found, and there a testr config exists just create it
except repository.RepositoryNotFound:
if not os.path.isfile(config) and not test_path:
msg = ("No config file found and --test-path not specified. "
"Either create or specify a .stestr.conf or use "
"--test-path ")
stdout.write(msg)
exit(1)
repo = util.get_repo_initialise(repo_type, repo_url)
combine_id = None
if combine:
latest_id = repo.latest_id()
combine_id = six.text_type(latest_id)
if no_discover:
ids = no_discover
if ids.find('/') != -1:
root, _ = os.path.splitext(ids)
ids = root.replace('/', '.')
run_cmd = 'python -m subunit.run ' + ids
def run_tests():
run_proc = [('subunit', output.ReturnCodeToSubunit(
subprocess.Popen(run_cmd, shell=True,
stdout=subprocess.PIPE)))]
return load.load(in_streams=run_proc,
subunit_out=subunit_out,
repo_type=repo_type,
repo_url=repo_url, run_id=combine_id,
pretty_out=pretty_out,
color=color, stdout=stdout, abbreviate=abbreviate,
suppress_attachments=suppress_attachments)
if not until_failure:
return run_tests()
else:
while True:
result = run_tests()
# If we're using subunit output we want to make sure to check
# the result from the repository because load() returns 0
# always on subunit output
if subunit:
summary = testtools.StreamSummary()
last_run = repo.get_latest_run().get_subunit_stream()
stream = subunit.ByteStreamToStreamResult(last_run)
summary.startTestRun()
try:
stream.run(summary)
finally:
summary.stopTestRun()
if not summary.wasSuccessful():
result = 1
if result:
return result
if failing or analyze_isolation:
ids = _find_failing(repo)
else:
ids = None
if load_list:
list_ids = set()
# Should perhaps be text.. currently does its own decode.
with open(load_list, 'rb') as list_file:
list_ids = set(parse_list(list_file.read()))
if ids is None:
# Use the supplied list verbatim
ids = list_ids
else:
# We have some already limited set of ids, just reduce to ids
# that are both failing and listed.
ids = list_ids.intersection(ids)
conf = config_file.TestrConf(config)
if not analyze_isolation:
cmd = conf.get_run_command(
ids, regexes=filters, group_regex=group_regex, repo_type=repo_type,
repo_url=repo_url, serial=serial, worker_path=worker_path,
concurrency=concurrency, blacklist_file=blacklist_file,
whitelist_file=whitelist_file, black_regex=black_regex,
top_dir=top_dir, test_path=test_path, randomize=random)
if isolated:
result = 0
cmd.setUp()
try:
ids = cmd.list_tests()
finally:
cmd.cleanUp()
for test_id in ids:
# TODO(mtreinish): add regex
cmd = conf.get_run_command(
[test_id], filters, group_regex=group_regex,
repo_type=repo_type, repo_url=repo_url, serial=serial,
worker_path=worker_path, concurrency=concurrency,
blacklist_file=blacklist_file,
whitelist_file=whitelist_file, black_regex=black_regex,
randomize=random, test_path=test_path, top_dir=top_dir)
run_result = _run_tests(
cmd, failing, analyze_isolation, isolated, until_failure,
subunit_out=subunit_out, combine_id=combine_id,
repo_type=repo_type, repo_url=repo_url,
pretty_out=pretty_out, color=color, abbreviate=abbreviate,
stdout=stdout, suppress_attachments=suppress_attachments)
if run_result > result:
result = run_result
return result
else:
return _run_tests(cmd, failing, analyze_isolation,
isolated, until_failure,
subunit_out=subunit_out,
combine_id=combine_id,
repo_type=repo_type,
repo_url=repo_url,
pretty_out=pretty_out,
color=color,
stdout=stdout,
abbreviate=abbreviate,
suppress_attachments=suppress_attachments)
else:
# Where do we source data about the cause of conflicts.
latest_run = repo.get_latest_run()
# Stage one: reduce the list of failing tests (possibly further
# reduced by testfilters) to eliminate fails-on-own tests.
spurious_failures = set()
for test_id in ids:
# TODO(mtrienish): Add regex
cmd = conf.get_run_command(
[test_id], group_regex=group_regex, repo_type=repo_type,
repo_url=repo_url, serial=serial, worker_path=worker_path,
concurrency=concurrency, blacklist_file=blacklist_file,
whitelist_file=whitelist_file, black_regex=black_regex,
randomize=random, test_path=test_path,
top_dir=top_dir)
if not _run_tests(cmd, failing, analyze_isolation, isolated,
until_failure):
# If the test was filtered, it won't have been run.
if test_id in repo.get_test_ids(repo.latest_id()):
spurious_failures.add(test_id)
# This is arguably ugly, why not just tell the system that
# a pass here isn't a real pass? [so that when we find a
# test that is spuriously failing, we don't forget
# that it is actually failing.
# Alternatively, perhaps this is a case for data mining:
# when a test starts passing, keep a journal, and allow
# digging back in time to see that it was a failure,
# what it failed with etc...
# The current solution is to just let it get marked as
# a pass temporarily.
if not spurious_failures:
# All done.
return 0
bisect_runner = bisect_tests.IsolationAnalyzer(
latest_run, conf, _run_tests, repo, test_path=test_path,
top_dir=top_dir, group_regex=group_regex, repo_type=repo_type,
repo_url=repo_url, serial=serial, concurrency=concurrency)
# spurious-failure -> cause.
return bisect_runner.bisect_tests(spurious_failures)
def _run_tests(cmd, failing, analyze_isolation, isolated, until_failure,
subunit_out=False, combine_id=None, repo_type='file',
repo_url=None, pretty_out=True, color=False, stdout=sys.stdout,
abbreviate=False, suppress_attachments=False):
"""Run the tests cmd was parameterised with."""
cmd.setUp()
try:
def run_tests():
run_procs = [('subunit',
output.ReturnCodeToSubunit(
proc)) for proc in cmd.run_tests()]
if not run_procs:
stdout.write("The specified regex doesn't match with anything")
return 1
return load.load((None, None), in_streams=run_procs,
subunit_out=subunit_out,
repo_type=repo_type,
repo_url=repo_url, run_id=combine_id,
pretty_out=pretty_out, color=color, stdout=stdout,
abbreviate=abbreviate,
suppress_attachments=suppress_attachments)
if not until_failure:
return run_tests()
else:
while True:
result = run_tests()
# If we're using subunit output we want to make sure to check
# the result from the repository because load() returns 0
# always on subunit output
if subunit_out:
repo = util.get_repo_open(repo_type, repo_url)
summary = testtools.StreamSummary()
last_run = repo.get_latest_run().get_subunit_stream()
stream = subunit.ByteStreamToStreamResult(last_run)
summary.startTestRun()
try:
stream.run(summary)
finally:
summary.stopTestRun()
if not summary.wasSuccessful():
result = 1
if result:
return result
finally:
cmd.cleanUp()