mirror of
https://gitflic.ru/project/openide/openide.git
synced 2025-12-19 01:50:56 +07:00
866 lines
31 KiB
Python
866 lines
31 KiB
Python
from __future__ import nested_scopes
|
|
|
|
import fnmatch
|
|
import os.path
|
|
from _pydev_runfiles.pydev_runfiles_coverage import start_coverage_support
|
|
from _pydevd_bundle.pydevd_constants import * #@UnusedWildImport
|
|
import re
|
|
import time
|
|
|
|
|
|
#=======================================================================================================================
|
|
# Configuration
|
|
#=======================================================================================================================
|
|
class Configuration:
|
|
|
|
def __init__(
|
|
self,
|
|
files_or_dirs='',
|
|
verbosity=2,
|
|
include_tests=None,
|
|
tests=None,
|
|
port=None,
|
|
files_to_tests=None,
|
|
jobs=1,
|
|
split_jobs='tests',
|
|
coverage_output_dir=None,
|
|
coverage_include=None,
|
|
coverage_output_file=None,
|
|
exclude_files=None,
|
|
exclude_tests=None,
|
|
include_files=None,
|
|
django=False,
|
|
):
|
|
self.files_or_dirs = files_or_dirs
|
|
self.verbosity = verbosity
|
|
self.include_tests = include_tests
|
|
self.tests = tests
|
|
self.port = port
|
|
self.files_to_tests = files_to_tests
|
|
self.jobs = jobs
|
|
self.split_jobs = split_jobs
|
|
self.django = django
|
|
|
|
if include_tests:
|
|
assert isinstance(include_tests, (list, tuple))
|
|
|
|
if exclude_files:
|
|
assert isinstance(exclude_files, (list, tuple))
|
|
|
|
if exclude_tests:
|
|
assert isinstance(exclude_tests, (list, tuple))
|
|
|
|
self.exclude_files = exclude_files
|
|
self.include_files = include_files
|
|
self.exclude_tests = exclude_tests
|
|
|
|
self.coverage_output_dir = coverage_output_dir
|
|
self.coverage_include = coverage_include
|
|
self.coverage_output_file = coverage_output_file
|
|
|
|
def __str__(self):
|
|
return '''Configuration
|
|
- files_or_dirs: %s
|
|
- verbosity: %s
|
|
- tests: %s
|
|
- port: %s
|
|
- files_to_tests: %s
|
|
- jobs: %s
|
|
- split_jobs: %s
|
|
|
|
- include_files: %s
|
|
- include_tests: %s
|
|
|
|
- exclude_files: %s
|
|
- exclude_tests: %s
|
|
|
|
- coverage_output_dir: %s
|
|
- coverage_include_dir: %s
|
|
- coverage_output_file: %s
|
|
|
|
- django: %s
|
|
''' % (
|
|
self.files_or_dirs,
|
|
self.verbosity,
|
|
self.tests,
|
|
self.port,
|
|
self.files_to_tests,
|
|
self.jobs,
|
|
self.split_jobs,
|
|
|
|
self.include_files,
|
|
self.include_tests,
|
|
|
|
self.exclude_files,
|
|
self.exclude_tests,
|
|
|
|
self.coverage_output_dir,
|
|
self.coverage_include,
|
|
self.coverage_output_file,
|
|
|
|
self.django,
|
|
)
|
|
|
|
|
|
#=======================================================================================================================
|
|
# parse_cmdline
|
|
#=======================================================================================================================
|
|
def parse_cmdline(argv=None):
|
|
"""
|
|
Parses command line and returns test directories, verbosity, test filter and test suites
|
|
|
|
usage:
|
|
runfiles.py -v|--verbosity <level> -t|--tests <Test.test1,Test2> dirs|files
|
|
|
|
Multiprocessing options:
|
|
jobs=number (with the number of jobs to be used to run the tests)
|
|
split_jobs='module'|'tests'
|
|
if == module, a given job will always receive all the tests from a module
|
|
if == tests, the tests will be split independently of their originating module (default)
|
|
|
|
--exclude_files = comma-separated list of patterns with files to exclude (fnmatch style)
|
|
--include_files = comma-separated list of patterns with files to include (fnmatch style)
|
|
--exclude_tests = comma-separated list of patterns with test names to exclude (fnmatch style)
|
|
|
|
Note: if --tests is given, --exclude_files, --include_files and --exclude_tests are ignored!
|
|
"""
|
|
if argv is None:
|
|
argv = sys.argv
|
|
|
|
verbosity = 2
|
|
include_tests = None
|
|
tests = None
|
|
port = None
|
|
jobs = 1
|
|
split_jobs = 'tests'
|
|
files_to_tests = {}
|
|
coverage_output_dir = None
|
|
coverage_include = None
|
|
exclude_files = None
|
|
exclude_tests = None
|
|
include_files = None
|
|
django = False
|
|
|
|
from _pydev_bundle._pydev_getopt import gnu_getopt
|
|
optlist, dirs = gnu_getopt(
|
|
argv[1:], "",
|
|
[
|
|
"verbosity=",
|
|
"tests=",
|
|
|
|
"port=",
|
|
"config_file=",
|
|
|
|
"jobs=",
|
|
"split_jobs=",
|
|
|
|
"include_tests=",
|
|
"include_files=",
|
|
|
|
"exclude_files=",
|
|
"exclude_tests=",
|
|
|
|
"coverage_output_dir=",
|
|
"coverage_include=",
|
|
|
|
"django="
|
|
]
|
|
)
|
|
|
|
for opt, value in optlist:
|
|
if opt in ("-v", "--verbosity"):
|
|
verbosity = value
|
|
|
|
elif opt in ("-p", "--port"):
|
|
port = int(value)
|
|
|
|
elif opt in ("-j", "--jobs"):
|
|
jobs = int(value)
|
|
|
|
elif opt in ("-s", "--split_jobs"):
|
|
split_jobs = value
|
|
if split_jobs not in ('module', 'tests'):
|
|
raise AssertionError('Expected split to be either "module" or "tests". Was :%s' % (split_jobs,))
|
|
|
|
elif opt in ("-d", "--coverage_output_dir",):
|
|
coverage_output_dir = value.strip()
|
|
|
|
elif opt in ("-i", "--coverage_include",):
|
|
coverage_include = value.strip()
|
|
|
|
elif opt in ("-I", "--include_tests"):
|
|
include_tests = value.split(',')
|
|
|
|
elif opt in ("-E", "--exclude_files"):
|
|
exclude_files = value.split(',')
|
|
|
|
elif opt in ("-F", "--include_files"):
|
|
include_files = value.split(',')
|
|
|
|
elif opt in ("-e", "--exclude_tests"):
|
|
exclude_tests = value.split(',')
|
|
|
|
elif opt in ("-t", "--tests"):
|
|
tests = value.split(',')
|
|
|
|
elif opt in ("--django",):
|
|
django = value.strip() in ['true', 'True', '1']
|
|
|
|
elif opt in ("-c", "--config_file"):
|
|
config_file = value.strip()
|
|
if os.path.exists(config_file):
|
|
f = open(config_file, 'rU')
|
|
try:
|
|
config_file_contents = f.read()
|
|
finally:
|
|
f.close()
|
|
|
|
if config_file_contents:
|
|
config_file_contents = config_file_contents.strip()
|
|
|
|
if config_file_contents:
|
|
for line in config_file_contents.splitlines():
|
|
file_and_test = line.split('|')
|
|
if len(file_and_test) == 2:
|
|
file, test = file_and_test
|
|
if file in files_to_tests:
|
|
files_to_tests[file].append(test)
|
|
else:
|
|
files_to_tests[file] = [test]
|
|
|
|
else:
|
|
sys.stderr.write('Could not find config file: %s\n' % (config_file,))
|
|
|
|
if type([]) != type(dirs):
|
|
dirs = [dirs]
|
|
|
|
ret_dirs = []
|
|
for d in dirs:
|
|
if '|' in d:
|
|
#paths may come from the ide separated by |
|
|
ret_dirs.extend(d.split('|'))
|
|
else:
|
|
ret_dirs.append(d)
|
|
|
|
verbosity = int(verbosity)
|
|
|
|
if tests:
|
|
if verbosity > 4:
|
|
sys.stdout.write('--tests provided. Ignoring --exclude_files, --exclude_tests and --include_files\n')
|
|
exclude_files = exclude_tests = include_files = None
|
|
|
|
config = Configuration(
|
|
ret_dirs,
|
|
verbosity,
|
|
include_tests,
|
|
tests,
|
|
port,
|
|
files_to_tests,
|
|
jobs,
|
|
split_jobs,
|
|
coverage_output_dir,
|
|
coverage_include,
|
|
exclude_files=exclude_files,
|
|
exclude_tests=exclude_tests,
|
|
include_files=include_files,
|
|
django=django,
|
|
)
|
|
|
|
if verbosity > 5:
|
|
sys.stdout.write(str(config) + '\n')
|
|
return config
|
|
|
|
|
|
#=======================================================================================================================
|
|
# PydevTestRunner
|
|
#=======================================================================================================================
|
|
class PydevTestRunner(object):
|
|
""" finds and runs a file or directory of files as a unit test """
|
|
|
|
__py_extensions = ["*.py", "*.pyw"]
|
|
__exclude_files = ["__init__.*"]
|
|
|
|
#Just to check that only this attributes will be written to this file
|
|
__slots__ = [
|
|
'verbosity', #Always used
|
|
|
|
'files_to_tests', #If this one is given, the ones below are not used
|
|
|
|
'files_or_dirs', #Files or directories received in the command line
|
|
'include_tests', #The filter used to collect the tests
|
|
'tests', #Strings with the tests to be run
|
|
|
|
'jobs', #Integer with the number of jobs that should be used to run the test cases
|
|
'split_jobs', #String with 'tests' or 'module' (how should the jobs be split)
|
|
|
|
'configuration',
|
|
'coverage',
|
|
]
|
|
|
|
def __init__(self, configuration):
|
|
self.verbosity = configuration.verbosity
|
|
|
|
self.jobs = configuration.jobs
|
|
self.split_jobs = configuration.split_jobs
|
|
|
|
files_to_tests = configuration.files_to_tests
|
|
if files_to_tests:
|
|
self.files_to_tests = files_to_tests
|
|
self.files_or_dirs = list(files_to_tests.keys())
|
|
self.tests = None
|
|
else:
|
|
self.files_to_tests = {}
|
|
self.files_or_dirs = configuration.files_or_dirs
|
|
self.tests = configuration.tests
|
|
|
|
self.configuration = configuration
|
|
self.__adjust_path()
|
|
|
|
|
|
def __adjust_path(self):
|
|
""" add the current file or directory to the python path """
|
|
path_to_append = None
|
|
for n in xrange(len(self.files_or_dirs)):
|
|
dir_name = self.__unixify(self.files_or_dirs[n])
|
|
if os.path.isdir(dir_name):
|
|
if not dir_name.endswith("/"):
|
|
self.files_or_dirs[n] = dir_name + "/"
|
|
path_to_append = os.path.normpath(dir_name)
|
|
elif os.path.isfile(dir_name):
|
|
path_to_append = os.path.dirname(dir_name)
|
|
else:
|
|
if not os.path.exists(dir_name):
|
|
block_line = '*' * 120
|
|
sys.stderr.write('\n%s\n* PyDev test runner error: %s does not exist.\n%s\n' % (block_line, dir_name, block_line))
|
|
return
|
|
msg = ("unknown type. \n%s\nshould be file or a directory.\n" % (dir_name))
|
|
raise RuntimeError(msg)
|
|
if path_to_append is not None:
|
|
#Add it as the last one (so, first things are resolved against the default dirs and
|
|
#if none resolves, then we try a relative import).
|
|
sys.path.append(path_to_append)
|
|
|
|
def __is_valid_py_file(self, fname):
|
|
""" tests that a particular file contains the proper file extension
|
|
and is not in the list of files to exclude """
|
|
is_valid_fname = 0
|
|
for invalid_fname in self.__class__.__exclude_files:
|
|
is_valid_fname += int(not fnmatch.fnmatch(fname, invalid_fname))
|
|
if_valid_ext = 0
|
|
for ext in self.__class__.__py_extensions:
|
|
if_valid_ext += int(fnmatch.fnmatch(fname, ext))
|
|
return is_valid_fname > 0 and if_valid_ext > 0
|
|
|
|
def __unixify(self, s):
|
|
""" stupid windows. converts the backslash to forwardslash for consistency """
|
|
return os.path.normpath(s).replace(os.sep, "/")
|
|
|
|
def __importify(self, s, dir=False):
|
|
""" turns directory separators into dots and removes the ".py*" extension
|
|
so the string can be used as import statement """
|
|
if not dir:
|
|
dirname, fname = os.path.split(s)
|
|
|
|
if fname.count('.') > 1:
|
|
#if there's a file named xxx.xx.py, it is not a valid module, so, let's not load it...
|
|
return
|
|
|
|
imp_stmt_pieces = [dirname.replace("\\", "/").replace("/", "."), os.path.splitext(fname)[0]]
|
|
|
|
if len(imp_stmt_pieces[0]) == 0:
|
|
imp_stmt_pieces = imp_stmt_pieces[1:]
|
|
|
|
return ".".join(imp_stmt_pieces)
|
|
|
|
else: #handle dir
|
|
return s.replace("\\", "/").replace("/", ".")
|
|
|
|
def __add_files(self, pyfiles, root, files):
|
|
""" if files match, appends them to pyfiles. used by os.path.walk fcn """
|
|
for fname in files:
|
|
if self.__is_valid_py_file(fname):
|
|
name_without_base_dir = self.__unixify(os.path.join(root, fname))
|
|
pyfiles.append(name_without_base_dir)
|
|
|
|
|
|
def find_import_files(self):
|
|
""" return a list of files to import """
|
|
if self.files_to_tests:
|
|
pyfiles = self.files_to_tests.keys()
|
|
else:
|
|
pyfiles = []
|
|
|
|
for base_dir in self.files_or_dirs:
|
|
if os.path.isdir(base_dir):
|
|
if hasattr(os, 'walk'):
|
|
for root, dirs, files in os.walk(base_dir):
|
|
|
|
#Note: handling directories that should be excluded from the search because
|
|
#they don't have __init__.py
|
|
exclude = {}
|
|
for d in dirs:
|
|
for init in ['__init__.py', '__init__.pyo', '__init__.pyc', '__init__.pyw', '__init__$py.class']:
|
|
if os.path.exists(os.path.join(root, d, init).replace('\\', '/')):
|
|
break
|
|
else:
|
|
exclude[d] = 1
|
|
|
|
if exclude:
|
|
new = []
|
|
for d in dirs:
|
|
if d not in exclude:
|
|
new.append(d)
|
|
|
|
dirs[:] = new
|
|
|
|
self.__add_files(pyfiles, root, files)
|
|
else:
|
|
# jython2.1 is too old for os.walk!
|
|
os.path.walk(base_dir, self.__add_files, pyfiles)
|
|
|
|
elif os.path.isfile(base_dir):
|
|
pyfiles.append(base_dir)
|
|
|
|
if self.configuration.exclude_files or self.configuration.include_files:
|
|
ret = []
|
|
for f in pyfiles:
|
|
add = True
|
|
basename = os.path.basename(f)
|
|
if self.configuration.include_files:
|
|
add = False
|
|
|
|
for pat in self.configuration.include_files:
|
|
if fnmatch.fnmatchcase(basename, pat):
|
|
add = True
|
|
break
|
|
|
|
if not add:
|
|
if self.verbosity > 3:
|
|
sys.stdout.write('Skipped file: %s (did not match any include_files pattern: %s)\n' % (f, self.configuration.include_files))
|
|
|
|
elif self.configuration.exclude_files:
|
|
for pat in self.configuration.exclude_files:
|
|
if fnmatch.fnmatchcase(basename, pat):
|
|
if self.verbosity > 3:
|
|
sys.stdout.write('Skipped file: %s (matched exclude_files pattern: %s)\n' % (f, pat))
|
|
|
|
elif self.verbosity > 2:
|
|
sys.stdout.write('Skipped file: %s\n' % (f,))
|
|
|
|
add = False
|
|
break
|
|
|
|
if add:
|
|
if self.verbosity > 3:
|
|
sys.stdout.write('Adding file: %s for test discovery.\n' % (f,))
|
|
ret.append(f)
|
|
|
|
pyfiles = ret
|
|
|
|
|
|
return pyfiles
|
|
|
|
def __get_module_from_str(self, modname, print_exception, pyfile):
|
|
""" Import the module in the given import path.
|
|
* Returns the "final" module, so importing "coilib40.subject.visu"
|
|
returns the "visu" module, not the "coilib40" as returned by __import__ """
|
|
try:
|
|
mod = __import__(modname)
|
|
for part in modname.split('.')[1:]:
|
|
mod = getattr(mod, part)
|
|
return mod
|
|
except:
|
|
if print_exception:
|
|
from _pydev_runfiles import pydev_runfiles_xml_rpc
|
|
from _pydevd_bundle import pydevd_io
|
|
buf_err = pydevd_io.start_redirect(keep_original_redirection=True, std='stderr')
|
|
buf_out = pydevd_io.start_redirect(keep_original_redirection=True, std='stdout')
|
|
try:
|
|
import traceback;traceback.print_exc()
|
|
sys.stderr.write('ERROR: Module: %s could not be imported (file: %s).\n' % (modname, pyfile))
|
|
finally:
|
|
pydevd_io.end_redirect('stderr')
|
|
pydevd_io.end_redirect('stdout')
|
|
|
|
pydev_runfiles_xml_rpc.notifyTest(
|
|
'error', buf_out.getvalue(), buf_err.getvalue(), pyfile, modname, 0)
|
|
|
|
return None
|
|
|
|
def remove_duplicates_keeping_order(self, seq):
|
|
seen = set()
|
|
seen_add = seen.add
|
|
return [x for x in seq if not (x in seen or seen_add(x))]
|
|
|
|
def find_modules_from_files(self, pyfiles):
|
|
""" returns a list of modules given a list of files """
|
|
#let's make sure that the paths we want are in the pythonpath...
|
|
imports = [(s, self.__importify(s)) for s in pyfiles]
|
|
|
|
sys_path = [os.path.normpath(path) for path in sys.path]
|
|
sys_path = self.remove_duplicates_keeping_order(sys_path)
|
|
|
|
system_paths = []
|
|
for s in sys_path:
|
|
system_paths.append(self.__importify(s, True))
|
|
|
|
ret = []
|
|
for pyfile, imp in imports:
|
|
if imp is None:
|
|
continue #can happen if a file is not a valid module
|
|
choices = []
|
|
for s in system_paths:
|
|
if imp.startswith(s):
|
|
add = imp[len(s) + 1:]
|
|
if add:
|
|
choices.append(add)
|
|
#sys.stdout.write(' ' + add + ' ')
|
|
|
|
if not choices:
|
|
sys.stdout.write('PYTHONPATH not found for file: %s\n' % imp)
|
|
else:
|
|
for i, import_str in enumerate(choices):
|
|
print_exception = i == len(choices) - 1
|
|
mod = self.__get_module_from_str(import_str, print_exception, pyfile)
|
|
if mod is not None:
|
|
ret.append((pyfile, mod, import_str))
|
|
break
|
|
|
|
|
|
return ret
|
|
|
|
#===================================================================================================================
|
|
# GetTestCaseNames
|
|
#===================================================================================================================
|
|
class GetTestCaseNames:
|
|
"""Yes, we need a class for that (cannot use outer context on jython 2.1)"""
|
|
|
|
def __init__(self, accepted_classes, accepted_methods):
|
|
self.accepted_classes = accepted_classes
|
|
self.accepted_methods = accepted_methods
|
|
|
|
def __call__(self, testCaseClass):
|
|
"""Return a sorted sequence of method names found within testCaseClass"""
|
|
testFnNames = []
|
|
className = testCaseClass.__name__
|
|
|
|
if className in self.accepted_classes:
|
|
for attrname in dir(testCaseClass):
|
|
#If a class is chosen, we select all the 'test' methods'
|
|
if attrname.startswith('test') and hasattr(getattr(testCaseClass, attrname), '__call__'):
|
|
testFnNames.append(attrname)
|
|
|
|
else:
|
|
for attrname in dir(testCaseClass):
|
|
#If we have the class+method name, we must do a full check and have an exact match.
|
|
if className + '.' + attrname in self.accepted_methods:
|
|
if hasattr(getattr(testCaseClass, attrname), '__call__'):
|
|
testFnNames.append(attrname)
|
|
|
|
#sorted() is not available in jython 2.1
|
|
testFnNames.sort()
|
|
return testFnNames
|
|
|
|
|
|
def _decorate_test_suite(self, suite, pyfile, module_name):
|
|
import unittest
|
|
if isinstance(suite, unittest.TestSuite):
|
|
add = False
|
|
suite.__pydev_pyfile__ = pyfile
|
|
suite.__pydev_module_name__ = module_name
|
|
|
|
for t in suite._tests:
|
|
t.__pydev_pyfile__ = pyfile
|
|
t.__pydev_module_name__ = module_name
|
|
if self._decorate_test_suite(t, pyfile, module_name):
|
|
add = True
|
|
|
|
return add
|
|
|
|
elif isinstance(suite, unittest.TestCase):
|
|
return True
|
|
|
|
else:
|
|
return False
|
|
|
|
|
|
|
|
def find_tests_from_modules(self, file_and_modules_and_module_name):
|
|
""" returns the unittests given a list of modules """
|
|
#Use our own suite!
|
|
from _pydev_runfiles import pydev_runfiles_unittest
|
|
import unittest
|
|
unittest.TestLoader.suiteClass = pydev_runfiles_unittest.PydevTestSuite
|
|
loader = unittest.TestLoader()
|
|
|
|
ret = []
|
|
if self.files_to_tests:
|
|
for pyfile, m, module_name in file_and_modules_and_module_name:
|
|
accepted_classes = {}
|
|
accepted_methods = {}
|
|
tests = self.files_to_tests[pyfile]
|
|
for t in tests:
|
|
accepted_methods[t] = t
|
|
|
|
loader.getTestCaseNames = self.GetTestCaseNames(accepted_classes, accepted_methods)
|
|
|
|
suite = loader.loadTestsFromModule(m)
|
|
if self._decorate_test_suite(suite, pyfile, module_name):
|
|
ret.append(suite)
|
|
return ret
|
|
|
|
|
|
if self.tests:
|
|
accepted_classes = {}
|
|
accepted_methods = {}
|
|
|
|
for t in self.tests:
|
|
splitted = t.split('.')
|
|
if len(splitted) == 1:
|
|
accepted_classes[t] = t
|
|
|
|
elif len(splitted) == 2:
|
|
accepted_methods[t] = t
|
|
|
|
loader.getTestCaseNames = self.GetTestCaseNames(accepted_classes, accepted_methods)
|
|
|
|
|
|
for pyfile, m, module_name in file_and_modules_and_module_name:
|
|
suite = loader.loadTestsFromModule(m)
|
|
if self._decorate_test_suite(suite, pyfile, module_name):
|
|
ret.append(suite)
|
|
|
|
return ret
|
|
|
|
|
|
def filter_tests(self, test_objs, internal_call=False):
|
|
""" based on a filter name, only return those tests that have
|
|
the test case names that match """
|
|
import unittest
|
|
if not internal_call:
|
|
if not self.configuration.include_tests and not self.tests and not self.configuration.exclude_tests:
|
|
#No need to filter if we have nothing to filter!
|
|
return test_objs
|
|
|
|
if self.verbosity > 1:
|
|
if self.configuration.include_tests:
|
|
sys.stdout.write('Tests to include: %s\n' % (self.configuration.include_tests,))
|
|
|
|
if self.tests:
|
|
sys.stdout.write('Tests to run: %s\n' % (self.tests,))
|
|
|
|
if self.configuration.exclude_tests:
|
|
sys.stdout.write('Tests to exclude: %s\n' % (self.configuration.exclude_tests,))
|
|
|
|
test_suite = []
|
|
for test_obj in test_objs:
|
|
|
|
if isinstance(test_obj, unittest.TestSuite):
|
|
#Note: keep the suites as they are and just 'fix' the tests (so, don't use the iter_tests).
|
|
if test_obj._tests:
|
|
test_obj._tests = self.filter_tests(test_obj._tests, True)
|
|
if test_obj._tests: #Only add the suite if we still have tests there.
|
|
test_suite.append(test_obj)
|
|
|
|
elif isinstance(test_obj, unittest.TestCase):
|
|
try:
|
|
testMethodName = test_obj._TestCase__testMethodName
|
|
except AttributeError:
|
|
#changed in python 2.5
|
|
testMethodName = test_obj._testMethodName
|
|
|
|
add = True
|
|
if self.configuration.exclude_tests:
|
|
for pat in self.configuration.exclude_tests:
|
|
if fnmatch.fnmatchcase(testMethodName, pat):
|
|
if self.verbosity > 3:
|
|
sys.stdout.write('Skipped test: %s (matched exclude_tests pattern: %s)\n' % (testMethodName, pat))
|
|
|
|
elif self.verbosity > 2:
|
|
sys.stdout.write('Skipped test: %s\n' % (testMethodName,))
|
|
|
|
add = False
|
|
break
|
|
|
|
if add:
|
|
if self.__match_tests(self.tests, test_obj, testMethodName):
|
|
include = True
|
|
if self.configuration.include_tests:
|
|
include = False
|
|
for pat in self.configuration.include_tests:
|
|
if fnmatch.fnmatchcase(testMethodName, pat):
|
|
include = True
|
|
break
|
|
if include:
|
|
test_suite.append(test_obj)
|
|
else:
|
|
if self.verbosity > 3:
|
|
sys.stdout.write('Skipped test: %s (did not match any include_tests pattern %s)\n' % (
|
|
testMethodName, self.configuration.include_tests,))
|
|
return test_suite
|
|
|
|
|
|
def iter_tests(self, test_objs):
|
|
#Note: not using yield because of Jython 2.1.
|
|
import unittest
|
|
tests = []
|
|
for test_obj in test_objs:
|
|
if isinstance(test_obj, unittest.TestSuite):
|
|
tests.extend(self.iter_tests(test_obj._tests))
|
|
|
|
elif isinstance(test_obj, unittest.TestCase):
|
|
tests.append(test_obj)
|
|
return tests
|
|
|
|
|
|
def list_test_names(self, test_objs):
|
|
names = []
|
|
for tc in self.iter_tests(test_objs):
|
|
try:
|
|
testMethodName = tc._TestCase__testMethodName
|
|
except AttributeError:
|
|
#changed in python 2.5
|
|
testMethodName = tc._testMethodName
|
|
names.append(testMethodName)
|
|
return names
|
|
|
|
|
|
def __match_tests(self, tests, test_case, test_method_name):
|
|
if not tests:
|
|
return 1
|
|
|
|
for t in tests:
|
|
class_and_method = t.split('.')
|
|
if len(class_and_method) == 1:
|
|
#only class name
|
|
if class_and_method[0] == test_case.__class__.__name__:
|
|
return 1
|
|
|
|
elif len(class_and_method) == 2:
|
|
if class_and_method[0] == test_case.__class__.__name__ and class_and_method[1] == test_method_name:
|
|
return 1
|
|
|
|
return 0
|
|
|
|
|
|
def __match(self, filter_list, name):
|
|
""" returns whether a test name matches the test filter """
|
|
if filter_list is None:
|
|
return 1
|
|
for f in filter_list:
|
|
if re.match(f, name):
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def run_tests(self, handle_coverage=True):
|
|
""" runs all tests """
|
|
sys.stdout.write("Finding files... ")
|
|
files = self.find_import_files()
|
|
if self.verbosity > 3:
|
|
sys.stdout.write('%s ... done.\n' % (self.files_or_dirs))
|
|
else:
|
|
sys.stdout.write('done.\n')
|
|
sys.stdout.write("Importing test modules ... ")
|
|
|
|
|
|
if handle_coverage:
|
|
coverage_files, coverage = start_coverage_support(self.configuration)
|
|
|
|
file_and_modules_and_module_name = self.find_modules_from_files(files)
|
|
sys.stdout.write("done.\n")
|
|
|
|
all_tests = self.find_tests_from_modules(file_and_modules_and_module_name)
|
|
all_tests = self.filter_tests(all_tests)
|
|
|
|
from _pydev_runfiles import pydev_runfiles_unittest
|
|
test_suite = pydev_runfiles_unittest.PydevTestSuite(all_tests)
|
|
from _pydev_runfiles import pydev_runfiles_xml_rpc
|
|
pydev_runfiles_xml_rpc.notifyTestsCollected(test_suite.countTestCases())
|
|
|
|
start_time = time.time()
|
|
|
|
def run_tests():
|
|
executed_in_parallel = False
|
|
if self.jobs > 1:
|
|
from _pydev_runfiles import pydev_runfiles_parallel
|
|
|
|
#What may happen is that the number of jobs needed is lower than the number of jobs requested
|
|
#(e.g.: 2 jobs were requested for running 1 test) -- in which case execute_tests_in_parallel will
|
|
#return False and won't run any tests.
|
|
executed_in_parallel = pydev_runfiles_parallel.execute_tests_in_parallel(
|
|
all_tests, self.jobs, self.split_jobs, self.verbosity, coverage_files, self.configuration.coverage_include)
|
|
|
|
if not executed_in_parallel:
|
|
#If in coverage, we don't need to pass anything here (coverage is already enabled for this execution).
|
|
runner = pydev_runfiles_unittest.PydevTextTestRunner(stream=sys.stdout, descriptions=1, verbosity=self.verbosity)
|
|
sys.stdout.write('\n')
|
|
runner.run(test_suite)
|
|
|
|
if self.configuration.django:
|
|
MyDjangoTestSuiteRunner(run_tests).run_tests([])
|
|
else:
|
|
run_tests()
|
|
|
|
if handle_coverage:
|
|
coverage.stop()
|
|
coverage.save()
|
|
|
|
total_time = 'Finished in: %.2f secs.' % (time.time() - start_time,)
|
|
pydev_runfiles_xml_rpc.notifyTestRunFinished(total_time)
|
|
|
|
|
|
try:
|
|
# django >= 1.8
|
|
import django
|
|
from django.test.runner import DiscoverRunner
|
|
|
|
class MyDjangoTestSuiteRunner(DiscoverRunner):
|
|
|
|
def __init__(self, on_run_suite):
|
|
django.setup()
|
|
DiscoverRunner.__init__(self)
|
|
self.on_run_suite = on_run_suite
|
|
|
|
def build_suite(self, *args, **kwargs):
|
|
pass
|
|
|
|
def suite_result(self, *args, **kwargs):
|
|
pass
|
|
|
|
def run_suite(self, *args, **kwargs):
|
|
self.on_run_suite()
|
|
except:
|
|
# django < 1.8
|
|
try:
|
|
from django.test.simple import DjangoTestSuiteRunner
|
|
except:
|
|
class DjangoTestSuiteRunner:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def run_tests(self, *args, **kwargs):
|
|
raise AssertionError("Unable to run suite with django.test.runner.DiscoverRunner nor django.test.simple.DjangoTestSuiteRunner because it couldn't be imported.")
|
|
|
|
class MyDjangoTestSuiteRunner(DjangoTestSuiteRunner):
|
|
|
|
def __init__(self, on_run_suite):
|
|
DjangoTestSuiteRunner.__init__(self)
|
|
self.on_run_suite = on_run_suite
|
|
|
|
def build_suite(self, *args, **kwargs):
|
|
pass
|
|
|
|
def suite_result(self, *args, **kwargs):
|
|
pass
|
|
|
|
def run_suite(self, *args, **kwargs):
|
|
self.on_run_suite()
|
|
|
|
|
|
#=======================================================================================================================
|
|
# main
|
|
#=======================================================================================================================
|
|
def main(configuration):
|
|
PydevTestRunner(configuration).run_tests()
|