Shofel2_T124_python/venv/lib/python3.10/site-packages/gevent/testing/testrunner.py

974 lines
35 KiB
Python
Raw Permalink Normal View History

2024-05-25 16:45:07 +00:00
#!/usr/bin/env python
from __future__ import print_function, absolute_import, division
import re
import sys
import os
import glob
import operator
import traceback
import importlib
from contextlib import contextmanager
from datetime import timedelta
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
from gevent._util import Lazy
from . import util
from .resources import parse_resources
from .resources import setup_resources
from .resources import unparse_resources
from .sysinfo import RUNNING_ON_CI
from .sysinfo import PYPY
from .sysinfo import PY2
from .sysinfo import RESOLVER_ARES
from .sysinfo import RUN_LEAKCHECKS
from .sysinfo import OSX
from . import six
from . import travis
# Import this while we're probably single-threaded/single-processed
# to try to avoid issues with PyPy 5.10.
# See https://bitbucket.org/pypy/pypy/issues/2769/systemerror-unexpected-internal-exception
try:
__import__('_testcapi')
except (ImportError, OSError):
# This can raise a wide variety of errors
pass
TIMEOUT = 100 # seconds
AVAIL_NWORKERS = cpu_count() - 1
DEFAULT_NWORKERS = int(os.environ.get('NWORKERS') or max(AVAIL_NWORKERS, 4))
if DEFAULT_NWORKERS > 15:
DEFAULT_NWORKERS = 10
if RUN_LEAKCHECKS:
# Capturing the stats takes time, and we run each
# test at least twice
TIMEOUT = 200
DEFAULT_RUN_OPTIONS = {
'timeout': TIMEOUT
}
if RUNNING_ON_CI:
# Too many and we get spurious timeouts
DEFAULT_NWORKERS = 4 if not OSX else 2
def _package_relative_filename(filename, package):
if not os.path.isfile(filename) and package:
# Ok, try to locate it as a module in the package
package_dir = _dir_from_package_name(package)
return os.path.join(package_dir, filename)
return filename
def _dir_from_package_name(package):
package_mod = importlib.import_module(package)
package_dir = os.path.dirname(package_mod.__file__)
return package_dir
class ResultCollector(object):
def __init__(self):
self.total = 0
self.failed = {}
self.passed = {}
self.total_cases = 0
self.total_skipped = 0
# Every RunResult reported: failed, passed, rerun
self._all_results = []
self.reran = {}
def __iadd__(self, result):
self._all_results.append(result)
if not result:
self.failed[result.name] = result #[cmd, kwargs]
else:
self.passed[result.name] = True
self.total_cases += result.run_count
self.total_skipped += result.skipped_count
return self
def __ilshift__(self, result):
"""
collector <<= result
Stores the result, but does not count it towards
the number of cases run, skipped, passed or failed.
"""
self._all_results.append(result)
self.reran[result.name] = result
return self
@property
def longest_running_tests(self):
"""
A new list of RunResult objects, sorted from longest running
to shortest running.
"""
return sorted(self._all_results,
key=operator.attrgetter('run_duration'),
reverse=True)
class FailFast(Exception):
pass
class Runner(object):
TIME_WAIT_REAP = 0.1
TIME_WAIT_SPAWN = 0.05
def __init__(self,
tests,
configured_failing_tests=(),
failfast=False,
quiet=False,
configured_run_alone_tests=(),
worker_count=DEFAULT_NWORKERS,
second_chance=False):
"""
:keyword quiet: Set to True or False to explicitly choose. Set to
`None` to use the default, which may come from the environment variable
``GEVENTTEST_QUIET``.
"""
self._tests = tests
self._configured_failing_tests = configured_failing_tests
self._quiet = quiet
self._configured_run_alone_tests = configured_run_alone_tests
assert not (failfast and second_chance)
self._failfast = failfast
self._second_chance = second_chance
self.results = ResultCollector()
self.results.total = len(self._tests)
self._running_jobs = []
self._worker_count = min(len(tests), worker_count) or 1
def _run_one(self, cmd, **kwargs):
if self._quiet is not None:
kwargs['quiet'] = self._quiet
result = util.run(cmd, **kwargs)
if not result and self._second_chance:
self.results <<= result
util.log("> %s", result.name, color='warning')
result = util.run(cmd, **kwargs)
if not result and self._failfast:
# Under Python 3.9 (maybe older versions?), raising the
# SystemExit here (a background thread belonging to the
# pool) doesn't seem to work well. It gets stuck waiting
# for a lock? The job never shows up as finished.
raise FailFast(cmd)
self.results += result
def _reap(self):
"Clean up the list of running jobs, returning how many are still outstanding."
for r in self._running_jobs[:]:
if not r.ready():
continue
if r.successful():
self._running_jobs.remove(r)
else:
r.get()
sys.exit('Internal error in testrunner.py: %r' % (r, ))
return len(self._running_jobs)
def _reap_all(self):
util.log("Reaping %d jobs", len(self._running_jobs), color="debug")
while self._running_jobs:
if not self._reap():
break
util.sleep(self.TIME_WAIT_REAP)
def _spawn(self, pool, cmd, options):
while True:
if self._reap() < self._worker_count:
job = pool.apply_async(self._run_one, (cmd, ), options or {})
self._running_jobs.append(job)
return
util.sleep(self.TIME_WAIT_SPAWN)
def __call__(self):
util.log("Running tests in parallel with concurrency %s %s." % (
self._worker_count,
util._colorize('number', '(concurrency available: %d)' % AVAIL_NWORKERS)
),)
# Setting global state, in theory we can be used multiple times.
# This is fine as long as we are single threaded and call these
# sequentially.
util.BUFFER_OUTPUT = self._worker_count > 1 or self._quiet
start = util.perf_counter()
try:
self._run_tests()
except KeyboardInterrupt:
self._report(util.perf_counter() - start, exit=False)
util.log('(partial results)\n')
raise
except:
traceback.print_exc()
raise
self._reap_all()
self._report(util.perf_counter() - start, exit=True)
def _run_tests(self):
"Runs the tests, produces no report."
run_alone = []
tests = self._tests
pool = ThreadPool(self._worker_count)
try:
for cmd, options in tests:
options = options or {}
if matches(self._configured_run_alone_tests, cmd):
run_alone.append((cmd, options))
else:
self._spawn(pool, cmd, options)
pool.close()
pool.join()
if run_alone:
util.log("Running tests marked standalone")
for cmd, options in run_alone:
self._run_one(cmd, **options)
except KeyboardInterrupt:
try:
util.log('Waiting for currently running to finish...')
self._reap_all()
except KeyboardInterrupt:
pool.terminate()
raise
except:
pool.terminate()
raise
def _report(self, elapsed_time, exit=False):
results = self.results
report(
results,
exit=exit,
took=elapsed_time,
configured_failing_tests=self._configured_failing_tests,
)
class TravisFoldingRunner(object):
def __init__(self, runner, travis_fold_msg):
self._runner = runner
self._travis_fold_msg = travis_fold_msg
self._travis_fold_name = str(int(util.perf_counter()))
# A zope-style acquisition proxy would be convenient here.
run_tests = runner._run_tests
def _run_tests():
self._begin_fold()
try:
run_tests()
finally:
self._end_fold()
runner._run_tests = _run_tests
def _begin_fold(self):
travis.fold_start(self._travis_fold_name,
self._travis_fold_msg)
def _end_fold(self):
travis.fold_end(self._travis_fold_name)
def __call__(self):
return self._runner()
class Discovery(object):
package_dir = None
package = None
def __init__(
self,
tests=None,
ignore_files=None,
ignored=(),
coverage=False,
package=None,
config=None,
allow_combine=True,
):
self.config = config or {}
self.ignore = set(ignored or ())
self.tests = tests
self.configured_test_options = config.get('TEST_FILE_OPTIONS', set())
self.allow_combine = allow_combine
if ignore_files:
ignore_files = ignore_files.split(',')
for f in ignore_files:
self.ignore.update(set(load_list_from_file(f, package)))
if coverage:
self.ignore.update(config.get('IGNORE_COVERAGE', set()))
if package:
self.package = package
self.package_dir = _dir_from_package_name(package)
class Discovered(object):
def __init__(self, package, configured_test_options, ignore, config, allow_combine):
self.orig_dir = os.getcwd()
self.configured_run_alone = config['RUN_ALONE']
self.configured_failing_tests = config['FAILING_TESTS']
self.package = package
self.configured_test_options = configured_test_options
self.allow_combine = allow_combine
self.ignore = ignore
self.to_import = []
self.std_monkey_patch_files = []
self.no_monkey_patch_files = []
self.commands = []
@staticmethod
def __makes_simple_monkey_patch(
contents,
_patch_present=re.compile(br'[^#].*patch_all\(\)'),
_patch_indented=re.compile(br' .*patch_all\(\)')
):
return (
# A non-commented patch_all() call is present
bool(_patch_present.search(contents))
# that is not indented (because that implies its not at the top-level,
# so some preconditions are being set)
and not _patch_indented.search(contents)
)
@staticmethod
def __file_allows_monkey_combine(contents):
return b'testrunner-no-monkey-combine' not in contents
@staticmethod
def __file_allows_combine(contents):
return b'testrunner-no-combine' not in contents
@staticmethod
def __calls_unittest_main_toplevel(
contents,
_greentest_main=re.compile(br' greentest.main\(\)'),
_unittest_main=re.compile(br' unittest.main\(\)'),
_import_main=re.compile(br'from gevent.testing import.*main'),
_main=re.compile(br' main\(\)'),
):
# TODO: Add a check that this comes in a line directly after
# if __name__ == __main__.
return (
_greentest_main.search(contents)
or _unittest_main.search(contents)
or (_import_main.search(contents) and _main.search(contents))
)
def __has_config(self, filename):
return (
RUN_LEAKCHECKS
or filename in self.configured_test_options
or filename in self.configured_run_alone
or matches(self.configured_failing_tests, filename)
)
def __can_monkey_combine(self, filename, contents):
return (
self.allow_combine
and not self.__has_config(filename)
and self.__makes_simple_monkey_patch(contents)
and self.__file_allows_monkey_combine(contents)
and self.__file_allows_combine(contents)
and self.__calls_unittest_main_toplevel(contents)
)
@staticmethod
def __makes_no_monkey_patch(contents, _patch_present=re.compile(br'[^#].*patch_\w*\(')):
return not _patch_present.search(contents)
def __can_nonmonkey_combine(self, filename, contents):
return (
self.allow_combine
and not self.__has_config(filename)
and self.__makes_no_monkey_patch(contents)
and self.__file_allows_combine(contents)
and self.__calls_unittest_main_toplevel(contents)
)
def __begin_command(self):
cmd = [sys.executable, '-u']
# XXX: -X track-resources is broken. This happened when I updated to
# PyPy 7.3.2. It started failing to even start inside the virtual environment
# with
#
# debug: OperationError:
# debug: operror-type: ImportError
# debug: operror-value: No module named traceback
#
# I don't know if this is PyPy's problem or a problem in virtualenv:
#
# virtualenv==20.0.35
# virtualenv-clone==0.5.4
# virtualenvwrapper==4.8.4
#
# Deferring investigation until I need this...
# if PYPY and PY2:
# # Doesn't seem to be an env var for this.
# # XXX: track-resources is broken in virtual environments
# # on 7.3.2.
# cmd.extend(('-X', 'track-resources'))
return cmd
def __add_test(self, qualified_name, filename, contents):
if b'TESTRUNNER' in contents: # test__monkey_patching.py
# XXX: Rework this to avoid importing.
# XXX: Rework this to allow test combining (it could write the files out and return
# them directly; we would use 'python -m gevent.monkey --module unittest ...)
self.to_import.append(qualified_name)
elif self.__can_monkey_combine(filename, contents):
self.std_monkey_patch_files.append(qualified_name if self.package else filename)
elif self.__can_nonmonkey_combine(filename, contents):
self.no_monkey_patch_files.append(qualified_name if self.package else filename)
else:
# XXX: For simple python module tests, try this with
# `runpy.run_module`, very similar to the way we run
# things for monkey patching. The idea here is that we
# can perform setup ahead of time (e.g.,
# setup_resources()) in each test without having to do
# it manually or force calls or modifications to those
# tests.
cmd = self.__begin_command()
if self.package:
# Using a package is the best way to work with coverage 5
# when we specify 'source = <package>'
cmd.append('-m' + qualified_name)
else:
cmd.append(filename)
options = DEFAULT_RUN_OPTIONS.copy()
options.update(self.configured_test_options.get(filename, {}))
self.commands.append((cmd, options))
@staticmethod
def __remove_options(lst):
return [x for x in lst if x and not x.startswith('-')]
def __expand_imports(self):
for qualified_name in self.to_import:
module = importlib.import_module(qualified_name)
for cmd, options in module.TESTRUNNER():
if self.__remove_options(cmd)[-1] in self.ignore:
continue
self.commands.append((cmd, options))
del self.to_import[:]
def __combine_commands(self, files, group_size=5):
if not files:
return
from itertools import groupby
cnt = [0, 0]
def make_group(_):
if cnt[0] > group_size:
cnt[0] = 0
cnt[1] += 1
cnt[0] += 1
return cnt[1]
for _, group in groupby(files, make_group):
cmd = self.__begin_command()
cmd.append('-m')
cmd.append('unittest')
# cmd.append('-v')
for name in group:
cmd.append(name)
self.commands.insert(0, (cmd, DEFAULT_RUN_OPTIONS.copy()))
del files[:]
def visit_file(self, filename):
# Support either 'gevent.tests.foo' or 'gevent/tests/foo.py'
if filename.startswith('gevent.tests'):
# XXX: How does this interact with 'package'? Probably not well
qualified_name = module_name = filename
filename = filename[len('gevent.tests') + 1:]
filename = filename.replace('.', os.sep) + '.py'
else:
module_name = os.path.splitext(filename)[0]
qualified_name = self.package + '.' + module_name if self.package else module_name
# Also allow just 'foo' as a shortcut for 'gevent.tests.foo'
abs_filename = os.path.abspath(filename)
if (
not os.path.exists(abs_filename)
and not filename.endswith('.py')
and os.path.exists(abs_filename + '.py') ):
abs_filename += '.py'
with open(abs_filename, 'rb') as f:
# Some of the test files (e.g., test__socket_dns) are
# UTF8 encoded. Depending on the environment, Python 3 may
# try to decode those as ASCII, which fails with UnicodeDecodeError.
# Thus, be sure to open and compare in binary mode.
# Open the absolute path to make errors more clear,
# but we can't store the absolute path, our configuration is based on
# relative file names.
contents = f.read()
self.__add_test(qualified_name, filename, contents)
def visit_files(self, filenames):
for filename in filenames:
self.visit_file(filename)
with Discovery._in_dir(self.orig_dir):
self.__expand_imports()
self.__combine_commands(self.std_monkey_patch_files)
self.__combine_commands(self.no_monkey_patch_files)
@staticmethod
@contextmanager
def _in_dir(package_dir):
olddir = os.getcwd()
if package_dir:
os.chdir(package_dir)
try:
yield
finally:
os.chdir(olddir)
@Lazy
def discovered(self):
tests = self.tests
discovered = self.Discovered(self.package, self.configured_test_options,
self.ignore, self.config, self.allow_combine)
# We need to glob relative names, our config is based on filenames still
with self._in_dir(self.package_dir):
if not tests:
tests = set(glob.glob('test_*.py')) - set(['test_support.py'])
else:
tests = set(tests)
if self.ignore:
# Always ignore the designated list, even if tests
# were specified on the command line. This fixes a
# nasty interaction with
# test__threading_vs_settrace.py being run under
# coverage when 'grep -l subprocess test*py' is used
# to list the tests to run.
tests -= self.ignore
tests = sorted(tests)
discovered.visit_files(tests)
return discovered
def __iter__(self):
return iter(self.discovered.commands) # pylint:disable=no-member
def __len__(self):
return len(self.discovered.commands) # pylint:disable=no-member
def load_list_from_file(filename, package):
result = []
if filename:
# pylint:disable=unspecified-encoding
with open(_package_relative_filename(filename, package)) as f:
for x in f:
x = x.split('#', 1)[0].strip()
if x:
result.append(x)
return result
def matches(possibilities, command, include_flaky=True):
if isinstance(command, list):
command = ' '.join(command)
for line in possibilities:
if not include_flaky and line.startswith('FLAKY '):
continue
line = line.replace('FLAKY ', '')
# Our configs are still mostly written in terms of file names,
# but the non-monkey tests are now using package names.
# Strip off '.py' from filenames to see if we match a module.
# XXX: This could be much better. Our command needs better structure.
if command.endswith(' ' + line) or command.endswith(line.replace(".py", '')):
return True
if ' ' not in command and command == line:
return True
return False
def format_seconds(seconds):
if seconds < 20:
return '%.1fs' % seconds
seconds = str(timedelta(seconds=round(seconds)))
if seconds.startswith('0:'):
seconds = seconds[2:]
return seconds
def _show_longest_running(result_collector, how_many=5):
longest_running_tests = result_collector.longest_running_tests
if not longest_running_tests:
return
# The only tricky part is handling repeats. we want to show them,
# but not count them as a distinct entry.
util.log('\nLongest-running tests:')
length_of_longest_formatted_decimal = len('%.1f' % longest_running_tests[0].run_duration)
frmt = '%' + str(length_of_longest_formatted_decimal) + '.1f seconds: %s'
seen_names = set()
for result in longest_running_tests:
util.log(frmt, result.run_duration, result.name)
seen_names.add(result.name)
if len(seen_names) >= how_many:
break
def report(result_collector, # type: ResultCollector
exit=True, took=None,
configured_failing_tests=()):
# pylint:disable=redefined-builtin,too-many-branches,too-many-locals
total = result_collector.total
failed = result_collector.failed
passed = result_collector.passed
total_cases = result_collector.total_cases
total_skipped = result_collector.total_skipped
_show_longest_running(result_collector)
if took:
took = ' in %s' % format_seconds(took)
else:
took = ''
failed_expected = []
failed_unexpected = []
passed_unexpected = []
for name in passed:
if matches(configured_failing_tests, name, include_flaky=False):
passed_unexpected.append(name)
if passed_unexpected:
util.log('\n%s/%s unexpected passes', len(passed_unexpected), total, color='error')
print_list(passed_unexpected)
if result_collector.reran:
util.log('\n%s/%s tests rerun', len(result_collector.reran), total, color='warning')
print_list(result_collector.reran)
if failed:
util.log('\n%s/%s tests failed%s', len(failed), total, took, color='warning')
for name in failed:
if matches(configured_failing_tests, name, include_flaky=True):
failed_expected.append(name)
else:
failed_unexpected.append(name)
if failed_expected:
util.log('\n%s/%s expected failures', len(failed_expected), total, color='warning')
print_list(failed_expected)
if failed_unexpected:
util.log('\n%s/%s unexpected failures', len(failed_unexpected), total, color='error')
print_list(failed_unexpected)
util.log(
'\nRan %s tests%s in %s files%s',
total_cases,
util._colorize('skipped', " (skipped=%d)" % total_skipped) if total_skipped else '',
total,
took,
)
if exit:
if failed_unexpected:
sys.exit(min(100, len(failed_unexpected)))
if passed_unexpected:
sys.exit(101)
if total <= 0:
sys.exit('No tests found.')
def print_list(lst):
for name in lst:
util.log(' - %s', name)
def _setup_environ(debug=False):
def not_set(key):
return not bool(os.environ.get(key))
if (not_set('PYTHONWARNINGS')
and (not sys.warnoptions
# Python 3.7 goes from [] to ['default'] for nothing
or sys.warnoptions == ['default'])):
# action:message:category:module:line
# - when a warning matches
# more than one option, the action for the last matching
# option is performed.
# - action is one of : ignore, default, all, module, once, error
# Enable default warnings such as ResourceWarning.
# ResourceWarning doesn't exist on Py2, so don't put it
# in there to avoid a warnnig.
defaults = [
'default',
'default::DeprecationWarning',
]
if not PY2:
defaults.append('default::ResourceWarning')
os.environ['PYTHONWARNINGS'] = ','.join(defaults + [
# On Python 3[.6], the system site.py module has
# "open(fullname, 'rU')" which produces the warning that
# 'U' is deprecated, so ignore warnings from site.py
'ignore:::site:',
# pkgutil on Python 2 complains about missing __init__.py
'ignore:::pkgutil:',
# importlib/_bootstrap.py likes to spit out "ImportWarning:
# can't resolve package from __spec__ or __package__, falling
# back on __name__ and __path__". I have no idea what that means, but it seems harmless
# and is annoying.
'ignore:::importlib._bootstrap:',
'ignore:::importlib._bootstrap_external:',
# importing ABCs from collections, not collections.abc
'ignore:::pkg_resources._vendor.pyparsing:',
'ignore:::dns.namedict:',
# dns.hash itself is being deprecated, importing it raises the warning;
# we don't import it, but dnspython still does
'ignore:::dns.hash:',
# dns.zone uses some raw regular expressions
# without the r'' syntax, leading to DeprecationWarning: invalid
# escape sequence. This is fixed in 2.0 (Python 3 only).
'ignore:::dns.zone:',
])
if not_set('PYTHONFAULTHANDLER'):
os.environ['PYTHONFAULTHANDLER'] = 'true'
if not_set('GEVENT_DEBUG') and debug:
os.environ['GEVENT_DEBUG'] = 'debug'
if not_set('PYTHONTRACEMALLOC') and debug:
# This slows the tests down quite a bit. Reserve
# for debugging.
os.environ['PYTHONTRACEMALLOC'] = '10'
if not_set('PYTHONDEVMODE'):
# Python 3.7 and above.
os.environ['PYTHONDEVMODE'] = '1'
if not_set('PYTHONMALLOC') and debug:
# Python 3.6 and above.
# This slows the tests down some, but
# can detect memory corruption. Unfortunately
# it can also be flaky, especially in pre-release
# versions of Python (e.g., lots of crashes on Python 3.8b4).
os.environ['PYTHONMALLOC'] = 'debug'
if sys.version_info.releaselevel != 'final' and not debug:
os.environ['PYTHONMALLOC'] = 'default'
os.environ['PYTHONDEVMODE'] = ''
interesting_envs = {
k: os.environ[k]
for k in os.environ
if k.startswith(('PYTHON', 'GEVENT'))
}
widest_k = max(len(k) for k in interesting_envs)
for k, v in sorted(interesting_envs.items()):
util.log('%*s\t=\t%s', widest_k, k, v, color="debug")
def main():
# pylint:disable=too-many-locals,too-many-statements
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--ignore')
parser.add_argument(
'--discover', action='store_true',
help="Only print the tests found."
)
parser.add_argument(
'--config', default='known_failures.py',
help="The path to the config file containing "
"FAILING_TESTS, IGNORED_TESTS and RUN_ALONE. "
"Defaults to %(default)s."
)
parser.add_argument(
"--coverage", action="store_true",
help="Enable coverage recording with coverage.py."
)
# TODO: Quiet and verbose should be mutually exclusive
parser.add_argument(
"--quiet", action="store_true", default=True,
help="Be quiet. Defaults to %(default)s. Also the "
"GEVENTTEST_QUIET environment variable."
)
parser.add_argument("--verbose", action="store_false", dest='quiet')
parser.add_argument(
"--debug", action="store_true", default=False,
help="Enable debug settings. If the GEVENT_DEBUG environment variable is not set, "
"this sets it to 'debug'. This can also enable PYTHONTRACEMALLOC and the debug PYTHONMALLOC "
"allocators, if not already set. Defaults to %(default)s."
)
parser.add_argument(
"--package", default="gevent.tests",
help="Load tests from the given package. Defaults to %(default)s."
)
parser.add_argument(
"--processes", "-j", default=DEFAULT_NWORKERS, type=int,
help="Use up to the given number of parallel processes to execute tests. "
"Defaults to %(default)s."
)
parser.add_argument(
'--no-combine', default=True, action='store_false',
help="Do not combine tests into process groups."
)
parser.add_argument('-u', '--use', metavar='RES1,RES2,...',
action='store', type=parse_resources,
help='specify which special resource intensive tests '
'to run. "all" is the default; "none" may also be used. '
'Disable individual resources with a leading -.'
'For example, "-u-network". GEVENTTEST_USE_RESOURCES is used '
'if no argument is given. To only use one resources, specify '
'"-unone,resource".')
parser.add_argument("--travis-fold", metavar="MSG",
help="Emit Travis CI log fold markers around the output.")
fail_parser = parser.add_mutually_exclusive_group()
fail_parser.add_argument(
"--second-chance", action="store_true", default=False,
help="Give failed tests a second chance.")
fail_parser.add_argument(
'--failfast', '-x', action='store_true', default=False,
help="Stop running after the first failure.")
parser.add_argument('tests', nargs='*')
options = parser.parse_args()
# options.use will be either None for not given, or a list
# of the last specified -u argument.
# If not given, use the default, which we'll take from the environment, if set.
options.use = list(set(parse_resources() if options.use is None else options.use))
# Whether or not it came from the environment, put it in the
# environment now.
os.environ['GEVENTTEST_USE_RESOURCES'] = unparse_resources(options.use)
setup_resources(options.use)
# Set this before any test imports in case of 'from .util import QUIET';
# not that this matters much because we spawn tests in subprocesses,
# it's the environment setting that matters
util.QUIET = options.quiet
if 'GEVENTTEST_QUIET' not in os.environ:
os.environ['GEVENTTEST_QUIET'] = str(options.quiet)
FAILING_TESTS = []
IGNORED_TESTS = []
RUN_ALONE = []
coverage = False
if options.coverage or os.environ.get("GEVENTTEST_COVERAGE"):
if PYPY and RUNNING_ON_CI:
print("Ignoring coverage option on PyPy on CI; slow")
else:
coverage = True
cov_config = os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc")
if PYPY:
cov_config = os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc-pypy")
this_dir = os.path.dirname(__file__)
site_dir = os.path.join(this_dir, 'coveragesite')
site_dir = os.path.abspath(site_dir)
os.environ['PYTHONPATH'] = site_dir + os.pathsep + os.environ.get("PYTHONPATH", "")
# We change directory often, use an absolute path to keep all the
# coverage files (which will have distinct suffixes because of parallel=true in .coveragerc
# in this directory; makes them easier to combine and use with coverage report)
os.environ['COVERAGE_FILE'] = os.path.abspath(".") + os.sep + ".coverage"
# XXX: Log this with color. Right now, it interferes (buffering) with other early
# output.
print("Enabling coverage to", os.environ['COVERAGE_FILE'],
"with site", site_dir,
"and configuration file", cov_config)
assert os.path.exists(cov_config)
assert os.path.exists(os.path.join(site_dir, 'sitecustomize.py'))
_setup_environ(debug=options.debug)
if options.config:
config = {}
options.config = _package_relative_filename(options.config, options.package)
with open(options.config) as f: # pylint:disable=unspecified-encoding
config_data = f.read()
six.exec_(config_data, config)
FAILING_TESTS = config['FAILING_TESTS']
IGNORED_TESTS = config['IGNORED_TESTS']
RUN_ALONE = config['RUN_ALONE']
tests = Discovery(
options.tests,
ignore_files=options.ignore,
ignored=IGNORED_TESTS,
coverage=coverage,
package=options.package,
config=config,
allow_combine=options.no_combine,
)
if options.discover:
for cmd, options in tests:
print(util.getname(cmd, env=options.get('env'), setenv=options.get('setenv')))
print('%s tests found.' % len(tests))
else:
if PYPY and RESOLVER_ARES:
# XXX: Add a way to force these.
print("Not running tests on pypy with c-ares; not a supported configuration")
return
if options.package:
# Put this directory on the path so relative imports work.
package_dir = _dir_from_package_name(options.package)
os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', "") + os.pathsep + package_dir
runner = Runner(
tests,
configured_failing_tests=FAILING_TESTS,
failfast=options.failfast,
quiet=options.quiet,
configured_run_alone_tests=RUN_ALONE,
worker_count=options.processes,
second_chance=options.second_chance,
)
if options.travis_fold:
runner = TravisFoldingRunner(runner, options.travis_fold)
runner()
if __name__ == '__main__':
main()