#!/usr/bin/env python from __future__ import print_function, absolute_import, division import re import sys import os import glob import operator import traceback import importlib from contextlib import contextmanager from datetime import timedelta from multiprocessing.pool import ThreadPool from multiprocessing import cpu_count from gevent._util import Lazy from . import util from .resources import parse_resources from .resources import setup_resources from .resources import unparse_resources from .sysinfo import RUNNING_ON_CI from .sysinfo import PYPY from .sysinfo import PY2 from .sysinfo import RESOLVER_ARES from .sysinfo import RUN_LEAKCHECKS from .sysinfo import OSX from . import six from . import travis # Import this while we're probably single-threaded/single-processed # to try to avoid issues with PyPy 5.10. # See https://bitbucket.org/pypy/pypy/issues/2769/systemerror-unexpected-internal-exception try: __import__('_testcapi') except (ImportError, OSError): # This can raise a wide variety of errors pass TIMEOUT = 100 # seconds AVAIL_NWORKERS = cpu_count() - 1 DEFAULT_NWORKERS = int(os.environ.get('NWORKERS') or max(AVAIL_NWORKERS, 4)) if DEFAULT_NWORKERS > 15: DEFAULT_NWORKERS = 10 if RUN_LEAKCHECKS: # Capturing the stats takes time, and we run each # test at least twice TIMEOUT = 200 DEFAULT_RUN_OPTIONS = { 'timeout': TIMEOUT } if RUNNING_ON_CI: # Too many and we get spurious timeouts DEFAULT_NWORKERS = 4 if not OSX else 2 def _package_relative_filename(filename, package): if not os.path.isfile(filename) and package: # Ok, try to locate it as a module in the package package_dir = _dir_from_package_name(package) return os.path.join(package_dir, filename) return filename def _dir_from_package_name(package): package_mod = importlib.import_module(package) package_dir = os.path.dirname(package_mod.__file__) return package_dir class ResultCollector(object): def __init__(self): self.total = 0 self.failed = {} self.passed = {} self.total_cases = 0 self.total_skipped = 0 # Every RunResult reported: failed, passed, rerun self._all_results = [] self.reran = {} def __iadd__(self, result): self._all_results.append(result) if not result: self.failed[result.name] = result #[cmd, kwargs] else: self.passed[result.name] = True self.total_cases += result.run_count self.total_skipped += result.skipped_count return self def __ilshift__(self, result): """ collector <<= result Stores the result, but does not count it towards the number of cases run, skipped, passed or failed. """ self._all_results.append(result) self.reran[result.name] = result return self @property def longest_running_tests(self): """ A new list of RunResult objects, sorted from longest running to shortest running. """ return sorted(self._all_results, key=operator.attrgetter('run_duration'), reverse=True) class FailFast(Exception): pass class Runner(object): TIME_WAIT_REAP = 0.1 TIME_WAIT_SPAWN = 0.05 def __init__(self, tests, configured_failing_tests=(), failfast=False, quiet=False, configured_run_alone_tests=(), worker_count=DEFAULT_NWORKERS, second_chance=False): """ :keyword quiet: Set to True or False to explicitly choose. Set to `None` to use the default, which may come from the environment variable ``GEVENTTEST_QUIET``. """ self._tests = tests self._configured_failing_tests = configured_failing_tests self._quiet = quiet self._configured_run_alone_tests = configured_run_alone_tests assert not (failfast and second_chance) self._failfast = failfast self._second_chance = second_chance self.results = ResultCollector() self.results.total = len(self._tests) self._running_jobs = [] self._worker_count = min(len(tests), worker_count) or 1 def _run_one(self, cmd, **kwargs): if self._quiet is not None: kwargs['quiet'] = self._quiet result = util.run(cmd, **kwargs) if not result and self._second_chance: self.results <<= result util.log("> %s", result.name, color='warning') result = util.run(cmd, **kwargs) if not result and self._failfast: # Under Python 3.9 (maybe older versions?), raising the # SystemExit here (a background thread belonging to the # pool) doesn't seem to work well. It gets stuck waiting # for a lock? The job never shows up as finished. raise FailFast(cmd) self.results += result def _reap(self): "Clean up the list of running jobs, returning how many are still outstanding." for r in self._running_jobs[:]: if not r.ready(): continue if r.successful(): self._running_jobs.remove(r) else: r.get() sys.exit('Internal error in testrunner.py: %r' % (r, )) return len(self._running_jobs) def _reap_all(self): util.log("Reaping %d jobs", len(self._running_jobs), color="debug") while self._running_jobs: if not self._reap(): break util.sleep(self.TIME_WAIT_REAP) def _spawn(self, pool, cmd, options): while True: if self._reap() < self._worker_count: job = pool.apply_async(self._run_one, (cmd, ), options or {}) self._running_jobs.append(job) return util.sleep(self.TIME_WAIT_SPAWN) def __call__(self): util.log("Running tests in parallel with concurrency %s %s." % ( self._worker_count, util._colorize('number', '(concurrency available: %d)' % AVAIL_NWORKERS) ),) # Setting global state, in theory we can be used multiple times. # This is fine as long as we are single threaded and call these # sequentially. util.BUFFER_OUTPUT = self._worker_count > 1 or self._quiet start = util.perf_counter() try: self._run_tests() except KeyboardInterrupt: self._report(util.perf_counter() - start, exit=False) util.log('(partial results)\n') raise except: traceback.print_exc() raise self._reap_all() self._report(util.perf_counter() - start, exit=True) def _run_tests(self): "Runs the tests, produces no report." run_alone = [] tests = self._tests pool = ThreadPool(self._worker_count) try: for cmd, options in tests: options = options or {} if matches(self._configured_run_alone_tests, cmd): run_alone.append((cmd, options)) else: self._spawn(pool, cmd, options) pool.close() pool.join() if run_alone: util.log("Running tests marked standalone") for cmd, options in run_alone: self._run_one(cmd, **options) except KeyboardInterrupt: try: util.log('Waiting for currently running to finish...') self._reap_all() except KeyboardInterrupt: pool.terminate() raise except: pool.terminate() raise def _report(self, elapsed_time, exit=False): results = self.results report( results, exit=exit, took=elapsed_time, configured_failing_tests=self._configured_failing_tests, ) class TravisFoldingRunner(object): def __init__(self, runner, travis_fold_msg): self._runner = runner self._travis_fold_msg = travis_fold_msg self._travis_fold_name = str(int(util.perf_counter())) # A zope-style acquisition proxy would be convenient here. run_tests = runner._run_tests def _run_tests(): self._begin_fold() try: run_tests() finally: self._end_fold() runner._run_tests = _run_tests def _begin_fold(self): travis.fold_start(self._travis_fold_name, self._travis_fold_msg) def _end_fold(self): travis.fold_end(self._travis_fold_name) def __call__(self): return self._runner() class Discovery(object): package_dir = None package = None def __init__( self, tests=None, ignore_files=None, ignored=(), coverage=False, package=None, config=None, allow_combine=True, ): self.config = config or {} self.ignore = set(ignored or ()) self.tests = tests self.configured_test_options = config.get('TEST_FILE_OPTIONS', set()) self.allow_combine = allow_combine if ignore_files: ignore_files = ignore_files.split(',') for f in ignore_files: self.ignore.update(set(load_list_from_file(f, package))) if coverage: self.ignore.update(config.get('IGNORE_COVERAGE', set())) if package: self.package = package self.package_dir = _dir_from_package_name(package) class Discovered(object): def __init__(self, package, configured_test_options, ignore, config, allow_combine): self.orig_dir = os.getcwd() self.configured_run_alone = config['RUN_ALONE'] self.configured_failing_tests = config['FAILING_TESTS'] self.package = package self.configured_test_options = configured_test_options self.allow_combine = allow_combine self.ignore = ignore self.to_import = [] self.std_monkey_patch_files = [] self.no_monkey_patch_files = [] self.commands = [] @staticmethod def __makes_simple_monkey_patch( contents, _patch_present=re.compile(br'[^#].*patch_all\(\)'), _patch_indented=re.compile(br' .*patch_all\(\)') ): return ( # A non-commented patch_all() call is present bool(_patch_present.search(contents)) # that is not indented (because that implies its not at the top-level, # so some preconditions are being set) and not _patch_indented.search(contents) ) @staticmethod def __file_allows_monkey_combine(contents): return b'testrunner-no-monkey-combine' not in contents @staticmethod def __file_allows_combine(contents): return b'testrunner-no-combine' not in contents @staticmethod def __calls_unittest_main_toplevel( contents, _greentest_main=re.compile(br' greentest.main\(\)'), _unittest_main=re.compile(br' unittest.main\(\)'), _import_main=re.compile(br'from gevent.testing import.*main'), _main=re.compile(br' main\(\)'), ): # TODO: Add a check that this comes in a line directly after # if __name__ == __main__. return ( _greentest_main.search(contents) or _unittest_main.search(contents) or (_import_main.search(contents) and _main.search(contents)) ) def __has_config(self, filename): return ( RUN_LEAKCHECKS or filename in self.configured_test_options or filename in self.configured_run_alone or matches(self.configured_failing_tests, filename) ) def __can_monkey_combine(self, filename, contents): return ( self.allow_combine and not self.__has_config(filename) and self.__makes_simple_monkey_patch(contents) and self.__file_allows_monkey_combine(contents) and self.__file_allows_combine(contents) and self.__calls_unittest_main_toplevel(contents) ) @staticmethod def __makes_no_monkey_patch(contents, _patch_present=re.compile(br'[^#].*patch_\w*\(')): return not _patch_present.search(contents) def __can_nonmonkey_combine(self, filename, contents): return ( self.allow_combine and not self.__has_config(filename) and self.__makes_no_monkey_patch(contents) and self.__file_allows_combine(contents) and self.__calls_unittest_main_toplevel(contents) ) def __begin_command(self): cmd = [sys.executable, '-u'] # XXX: -X track-resources is broken. This happened when I updated to # PyPy 7.3.2. It started failing to even start inside the virtual environment # with # # debug: OperationError: # debug: operror-type: ImportError # debug: operror-value: No module named traceback # # I don't know if this is PyPy's problem or a problem in virtualenv: # # virtualenv==20.0.35 # virtualenv-clone==0.5.4 # virtualenvwrapper==4.8.4 # # Deferring investigation until I need this... # if PYPY and PY2: # # Doesn't seem to be an env var for this. # # XXX: track-resources is broken in virtual environments # # on 7.3.2. # cmd.extend(('-X', 'track-resources')) return cmd def __add_test(self, qualified_name, filename, contents): if b'TESTRUNNER' in contents: # test__monkey_patching.py # XXX: Rework this to avoid importing. # XXX: Rework this to allow test combining (it could write the files out and return # them directly; we would use 'python -m gevent.monkey --module unittest ...) self.to_import.append(qualified_name) elif self.__can_monkey_combine(filename, contents): self.std_monkey_patch_files.append(qualified_name if self.package else filename) elif self.__can_nonmonkey_combine(filename, contents): self.no_monkey_patch_files.append(qualified_name if self.package else filename) else: # XXX: For simple python module tests, try this with # `runpy.run_module`, very similar to the way we run # things for monkey patching. The idea here is that we # can perform setup ahead of time (e.g., # setup_resources()) in each test without having to do # it manually or force calls or modifications to those # tests. cmd = self.__begin_command() if self.package: # Using a package is the best way to work with coverage 5 # when we specify 'source = ' cmd.append('-m' + qualified_name) else: cmd.append(filename) options = DEFAULT_RUN_OPTIONS.copy() options.update(self.configured_test_options.get(filename, {})) self.commands.append((cmd, options)) @staticmethod def __remove_options(lst): return [x for x in lst if x and not x.startswith('-')] def __expand_imports(self): for qualified_name in self.to_import: module = importlib.import_module(qualified_name) for cmd, options in module.TESTRUNNER(): if self.__remove_options(cmd)[-1] in self.ignore: continue self.commands.append((cmd, options)) del self.to_import[:] def __combine_commands(self, files, group_size=5): if not files: return from itertools import groupby cnt = [0, 0] def make_group(_): if cnt[0] > group_size: cnt[0] = 0 cnt[1] += 1 cnt[0] += 1 return cnt[1] for _, group in groupby(files, make_group): cmd = self.__begin_command() cmd.append('-m') cmd.append('unittest') # cmd.append('-v') for name in group: cmd.append(name) self.commands.insert(0, (cmd, DEFAULT_RUN_OPTIONS.copy())) del files[:] def visit_file(self, filename): # Support either 'gevent.tests.foo' or 'gevent/tests/foo.py' if filename.startswith('gevent.tests'): # XXX: How does this interact with 'package'? Probably not well qualified_name = module_name = filename filename = filename[len('gevent.tests') + 1:] filename = filename.replace('.', os.sep) + '.py' else: module_name = os.path.splitext(filename)[0] qualified_name = self.package + '.' + module_name if self.package else module_name # Also allow just 'foo' as a shortcut for 'gevent.tests.foo' abs_filename = os.path.abspath(filename) if ( not os.path.exists(abs_filename) and not filename.endswith('.py') and os.path.exists(abs_filename + '.py') ): abs_filename += '.py' with open(abs_filename, 'rb') as f: # Some of the test files (e.g., test__socket_dns) are # UTF8 encoded. Depending on the environment, Python 3 may # try to decode those as ASCII, which fails with UnicodeDecodeError. # Thus, be sure to open and compare in binary mode. # Open the absolute path to make errors more clear, # but we can't store the absolute path, our configuration is based on # relative file names. contents = f.read() self.__add_test(qualified_name, filename, contents) def visit_files(self, filenames): for filename in filenames: self.visit_file(filename) with Discovery._in_dir(self.orig_dir): self.__expand_imports() self.__combine_commands(self.std_monkey_patch_files) self.__combine_commands(self.no_monkey_patch_files) @staticmethod @contextmanager def _in_dir(package_dir): olddir = os.getcwd() if package_dir: os.chdir(package_dir) try: yield finally: os.chdir(olddir) @Lazy def discovered(self): tests = self.tests discovered = self.Discovered(self.package, self.configured_test_options, self.ignore, self.config, self.allow_combine) # We need to glob relative names, our config is based on filenames still with self._in_dir(self.package_dir): if not tests: tests = set(glob.glob('test_*.py')) - set(['test_support.py']) else: tests = set(tests) if self.ignore: # Always ignore the designated list, even if tests # were specified on the command line. This fixes a # nasty interaction with # test__threading_vs_settrace.py being run under # coverage when 'grep -l subprocess test*py' is used # to list the tests to run. tests -= self.ignore tests = sorted(tests) discovered.visit_files(tests) return discovered def __iter__(self): return iter(self.discovered.commands) # pylint:disable=no-member def __len__(self): return len(self.discovered.commands) # pylint:disable=no-member def load_list_from_file(filename, package): result = [] if filename: # pylint:disable=unspecified-encoding with open(_package_relative_filename(filename, package)) as f: for x in f: x = x.split('#', 1)[0].strip() if x: result.append(x) return result def matches(possibilities, command, include_flaky=True): if isinstance(command, list): command = ' '.join(command) for line in possibilities: if not include_flaky and line.startswith('FLAKY '): continue line = line.replace('FLAKY ', '') # Our configs are still mostly written in terms of file names, # but the non-monkey tests are now using package names. # Strip off '.py' from filenames to see if we match a module. # XXX: This could be much better. Our command needs better structure. if command.endswith(' ' + line) or command.endswith(line.replace(".py", '')): return True if ' ' not in command and command == line: return True return False def format_seconds(seconds): if seconds < 20: return '%.1fs' % seconds seconds = str(timedelta(seconds=round(seconds))) if seconds.startswith('0:'): seconds = seconds[2:] return seconds def _show_longest_running(result_collector, how_many=5): longest_running_tests = result_collector.longest_running_tests if not longest_running_tests: return # The only tricky part is handling repeats. we want to show them, # but not count them as a distinct entry. util.log('\nLongest-running tests:') length_of_longest_formatted_decimal = len('%.1f' % longest_running_tests[0].run_duration) frmt = '%' + str(length_of_longest_formatted_decimal) + '.1f seconds: %s' seen_names = set() for result in longest_running_tests: util.log(frmt, result.run_duration, result.name) seen_names.add(result.name) if len(seen_names) >= how_many: break def report(result_collector, # type: ResultCollector exit=True, took=None, configured_failing_tests=()): # pylint:disable=redefined-builtin,too-many-branches,too-many-locals total = result_collector.total failed = result_collector.failed passed = result_collector.passed total_cases = result_collector.total_cases total_skipped = result_collector.total_skipped _show_longest_running(result_collector) if took: took = ' in %s' % format_seconds(took) else: took = '' failed_expected = [] failed_unexpected = [] passed_unexpected = [] for name in passed: if matches(configured_failing_tests, name, include_flaky=False): passed_unexpected.append(name) if passed_unexpected: util.log('\n%s/%s unexpected passes', len(passed_unexpected), total, color='error') print_list(passed_unexpected) if result_collector.reran: util.log('\n%s/%s tests rerun', len(result_collector.reran), total, color='warning') print_list(result_collector.reran) if failed: util.log('\n%s/%s tests failed%s', len(failed), total, took, color='warning') for name in failed: if matches(configured_failing_tests, name, include_flaky=True): failed_expected.append(name) else: failed_unexpected.append(name) if failed_expected: util.log('\n%s/%s expected failures', len(failed_expected), total, color='warning') print_list(failed_expected) if failed_unexpected: util.log('\n%s/%s unexpected failures', len(failed_unexpected), total, color='error') print_list(failed_unexpected) util.log( '\nRan %s tests%s in %s files%s', total_cases, util._colorize('skipped', " (skipped=%d)" % total_skipped) if total_skipped else '', total, took, ) if exit: if failed_unexpected: sys.exit(min(100, len(failed_unexpected))) if passed_unexpected: sys.exit(101) if total <= 0: sys.exit('No tests found.') def print_list(lst): for name in lst: util.log(' - %s', name) def _setup_environ(debug=False): def not_set(key): return not bool(os.environ.get(key)) if (not_set('PYTHONWARNINGS') and (not sys.warnoptions # Python 3.7 goes from [] to ['default'] for nothing or sys.warnoptions == ['default'])): # action:message:category:module:line # - when a warning matches # more than one option, the action for the last matching # option is performed. # - action is one of : ignore, default, all, module, once, error # Enable default warnings such as ResourceWarning. # ResourceWarning doesn't exist on Py2, so don't put it # in there to avoid a warnnig. defaults = [ 'default', 'default::DeprecationWarning', ] if not PY2: defaults.append('default::ResourceWarning') os.environ['PYTHONWARNINGS'] = ','.join(defaults + [ # On Python 3[.6], the system site.py module has # "open(fullname, 'rU')" which produces the warning that # 'U' is deprecated, so ignore warnings from site.py 'ignore:::site:', # pkgutil on Python 2 complains about missing __init__.py 'ignore:::pkgutil:', # importlib/_bootstrap.py likes to spit out "ImportWarning: # can't resolve package from __spec__ or __package__, falling # back on __name__ and __path__". I have no idea what that means, but it seems harmless # and is annoying. 'ignore:::importlib._bootstrap:', 'ignore:::importlib._bootstrap_external:', # importing ABCs from collections, not collections.abc 'ignore:::pkg_resources._vendor.pyparsing:', 'ignore:::dns.namedict:', # dns.hash itself is being deprecated, importing it raises the warning; # we don't import it, but dnspython still does 'ignore:::dns.hash:', # dns.zone uses some raw regular expressions # without the r'' syntax, leading to DeprecationWarning: invalid # escape sequence. This is fixed in 2.0 (Python 3 only). 'ignore:::dns.zone:', ]) if not_set('PYTHONFAULTHANDLER'): os.environ['PYTHONFAULTHANDLER'] = 'true' if not_set('GEVENT_DEBUG') and debug: os.environ['GEVENT_DEBUG'] = 'debug' if not_set('PYTHONTRACEMALLOC') and debug: # This slows the tests down quite a bit. Reserve # for debugging. os.environ['PYTHONTRACEMALLOC'] = '10' if not_set('PYTHONDEVMODE'): # Python 3.7 and above. os.environ['PYTHONDEVMODE'] = '1' if not_set('PYTHONMALLOC') and debug: # Python 3.6 and above. # This slows the tests down some, but # can detect memory corruption. Unfortunately # it can also be flaky, especially in pre-release # versions of Python (e.g., lots of crashes on Python 3.8b4). os.environ['PYTHONMALLOC'] = 'debug' if sys.version_info.releaselevel != 'final' and not debug: os.environ['PYTHONMALLOC'] = 'default' os.environ['PYTHONDEVMODE'] = '' interesting_envs = { k: os.environ[k] for k in os.environ if k.startswith(('PYTHON', 'GEVENT')) } widest_k = max(len(k) for k in interesting_envs) for k, v in sorted(interesting_envs.items()): util.log('%*s\t=\t%s', widest_k, k, v, color="debug") def main(): # pylint:disable=too-many-locals,too-many-statements import argparse parser = argparse.ArgumentParser() parser.add_argument('--ignore') parser.add_argument( '--discover', action='store_true', help="Only print the tests found." ) parser.add_argument( '--config', default='known_failures.py', help="The path to the config file containing " "FAILING_TESTS, IGNORED_TESTS and RUN_ALONE. " "Defaults to %(default)s." ) parser.add_argument( "--coverage", action="store_true", help="Enable coverage recording with coverage.py." ) # TODO: Quiet and verbose should be mutually exclusive parser.add_argument( "--quiet", action="store_true", default=True, help="Be quiet. Defaults to %(default)s. Also the " "GEVENTTEST_QUIET environment variable." ) parser.add_argument("--verbose", action="store_false", dest='quiet') parser.add_argument( "--debug", action="store_true", default=False, help="Enable debug settings. If the GEVENT_DEBUG environment variable is not set, " "this sets it to 'debug'. This can also enable PYTHONTRACEMALLOC and the debug PYTHONMALLOC " "allocators, if not already set. Defaults to %(default)s." ) parser.add_argument( "--package", default="gevent.tests", help="Load tests from the given package. Defaults to %(default)s." ) parser.add_argument( "--processes", "-j", default=DEFAULT_NWORKERS, type=int, help="Use up to the given number of parallel processes to execute tests. " "Defaults to %(default)s." ) parser.add_argument( '--no-combine', default=True, action='store_false', help="Do not combine tests into process groups." ) parser.add_argument('-u', '--use', metavar='RES1,RES2,...', action='store', type=parse_resources, help='specify which special resource intensive tests ' 'to run. "all" is the default; "none" may also be used. ' 'Disable individual resources with a leading -.' 'For example, "-u-network". GEVENTTEST_USE_RESOURCES is used ' 'if no argument is given. To only use one resources, specify ' '"-unone,resource".') parser.add_argument("--travis-fold", metavar="MSG", help="Emit Travis CI log fold markers around the output.") fail_parser = parser.add_mutually_exclusive_group() fail_parser.add_argument( "--second-chance", action="store_true", default=False, help="Give failed tests a second chance.") fail_parser.add_argument( '--failfast', '-x', action='store_true', default=False, help="Stop running after the first failure.") parser.add_argument('tests', nargs='*') options = parser.parse_args() # options.use will be either None for not given, or a list # of the last specified -u argument. # If not given, use the default, which we'll take from the environment, if set. options.use = list(set(parse_resources() if options.use is None else options.use)) # Whether or not it came from the environment, put it in the # environment now. os.environ['GEVENTTEST_USE_RESOURCES'] = unparse_resources(options.use) setup_resources(options.use) # Set this before any test imports in case of 'from .util import QUIET'; # not that this matters much because we spawn tests in subprocesses, # it's the environment setting that matters util.QUIET = options.quiet if 'GEVENTTEST_QUIET' not in os.environ: os.environ['GEVENTTEST_QUIET'] = str(options.quiet) FAILING_TESTS = [] IGNORED_TESTS = [] RUN_ALONE = [] coverage = False if options.coverage or os.environ.get("GEVENTTEST_COVERAGE"): if PYPY and RUNNING_ON_CI: print("Ignoring coverage option on PyPy on CI; slow") else: coverage = True cov_config = os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc") if PYPY: cov_config = os.environ['COVERAGE_PROCESS_START'] = os.path.abspath(".coveragerc-pypy") this_dir = os.path.dirname(__file__) site_dir = os.path.join(this_dir, 'coveragesite') site_dir = os.path.abspath(site_dir) os.environ['PYTHONPATH'] = site_dir + os.pathsep + os.environ.get("PYTHONPATH", "") # We change directory often, use an absolute path to keep all the # coverage files (which will have distinct suffixes because of parallel=true in .coveragerc # in this directory; makes them easier to combine and use with coverage report) os.environ['COVERAGE_FILE'] = os.path.abspath(".") + os.sep + ".coverage" # XXX: Log this with color. Right now, it interferes (buffering) with other early # output. print("Enabling coverage to", os.environ['COVERAGE_FILE'], "with site", site_dir, "and configuration file", cov_config) assert os.path.exists(cov_config) assert os.path.exists(os.path.join(site_dir, 'sitecustomize.py')) _setup_environ(debug=options.debug) if options.config: config = {} options.config = _package_relative_filename(options.config, options.package) with open(options.config) as f: # pylint:disable=unspecified-encoding config_data = f.read() six.exec_(config_data, config) FAILING_TESTS = config['FAILING_TESTS'] IGNORED_TESTS = config['IGNORED_TESTS'] RUN_ALONE = config['RUN_ALONE'] tests = Discovery( options.tests, ignore_files=options.ignore, ignored=IGNORED_TESTS, coverage=coverage, package=options.package, config=config, allow_combine=options.no_combine, ) if options.discover: for cmd, options in tests: print(util.getname(cmd, env=options.get('env'), setenv=options.get('setenv'))) print('%s tests found.' % len(tests)) else: if PYPY and RESOLVER_ARES: # XXX: Add a way to force these. print("Not running tests on pypy with c-ares; not a supported configuration") return if options.package: # Put this directory on the path so relative imports work. package_dir = _dir_from_package_name(options.package) os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', "") + os.pathsep + package_dir runner = Runner( tests, configured_failing_tests=FAILING_TESTS, failfast=options.failfast, quiet=options.quiet, configured_run_alone_tests=RUN_ALONE, worker_count=options.processes, second_chance=options.second_chance, ) if options.travis_fold: runner = TravisFoldingRunner(runner, options.travis_fold) runner() if __name__ == '__main__': main()