Files
openide/python/helpers/pycharm/_jb_runner_tools.py
Alex Grönholm fddc5e1b79 PY-56529: Fix test runner replacing "." in test parameter with "::"
GitOrigin-RevId: ab6a6e91d19e1279e191c474cf20247099c5cdf4
2025-02-11 15:44:04 +00:00

387 lines
15 KiB
Python

# coding=utf-8
"""
Tools to implement runners (https://confluence.jetbrains.com/display/~link/PyCharm+test+runners+protocol)
"""
import os
import re
import sys
from collections import OrderedDict
import _jb_utils
from teamcity import teamcity_presence_env_var, messages
# Some runners need it to "detect" TC and start protocol
if teamcity_presence_env_var not in os.environ:
os.environ[teamcity_presence_env_var] = "LOCAL"
# Providing this env variable disables output buffering.
# anything sent to stdout/stderr goes to IDE directly, not after test is over like it is done by default.
# out and err are not in sync, so output may go to wrong test
JB_DISABLE_BUFFERING = "JB_DISABLE_BUFFERING" in os.environ
# getcwd resolves symlinks, but PWD is not supported by some shells
PROJECT_DIR = os.getenv('PWD', os.getcwd())
def _parse_parametrized(part):
"""
Support nose generators / pytest parameters and other functions that provides names like foo(1,2)
Until https://github.com/JetBrains/teamcity-messages/issues/121, all such tests are provided
with parentheses.
Tests with docstring are reported in similar way but they have space before parenthesis and should be ignored
by this function
"""
match = re.match("^([^\\s)(]+)(\\(.+\\))$", part)
if not match:
return [part]
else:
return [match.group(1), match.group(2)]
class _TreeManagerHolder(object):
def __init__(self):
self.parallel = "JB_USE_PARALLEL_TREE_MANAGER" in os.environ
self.offset = 0
self._manager_imp = None
@property
def manager(self):
if not self._manager_imp:
self._fill_manager()
return self._manager_imp
def _fill_manager(self):
if self.parallel:
from _jb_parallel_tree_manager import ParallelTreeManager
self._manager_imp = ParallelTreeManager(self.offset)
else:
from _jb_serial_tree_manager import SerialTreeManager
self._manager_imp = SerialTreeManager(self.offset)
_TREE_MANAGER_HOLDER = _TreeManagerHolder()
def set_parallel_mode():
_TREE_MANAGER_HOLDER.parallel = True
def is_parallel_mode():
return _TREE_MANAGER_HOLDER.parallel
# Monkeypatching TC
_old_service_messages = messages.TeamcityServiceMessages
PARSE_FUNC = None
class NewTeamcityServiceMessages(_old_service_messages):
_latest_subtest_result = None
# [full_test_name] = (test_name, node_id, parent_node_id)
_test_suites = OrderedDict()
INSTANCE = None
def __init__(self, *args, **kwargs):
super(NewTeamcityServiceMessages, self).__init__(*args, **kwargs)
NewTeamcityServiceMessages.INSTANCE = self
def message(self, messageName, **properties):
if messageName in {"enteredTheMatrix", "testCount"}:
if "_jb_do_not_call_enter_matrix" not in os.environ:
_old_service_messages.message(self, messageName, **properties)
return
full_name = properties["name"]
try:
# Report directory so Java site knows which folder to resolve names against
# tests with docstrings are reported in format "test.name (some test here)".
# text should be part of name, but not location.
possible_location = str(full_name)
loc = possible_location.find("(")
if loc > 0:
possible_location = possible_location[:loc].strip()
properties["locationHint"] = "python<{0}>://{1}".format(PROJECT_DIR,
possible_location)
except KeyError:
# If message does not have name, then it is not test
# Simply pass it
_old_service_messages.message(self, messageName, **properties)
return
current, parent = _TREE_MANAGER_HOLDER.manager.get_node_ids(full_name)
if not current and not parent:
return
# Shortcut for name
try:
properties["name"] = str(full_name).split(".")[-1]
except IndexError:
pass
properties["nodeId"] = str(current)
properties["parentNodeId"] = str(parent)
is_test = messageName == "testStarted"
if messageName == "testSuiteStarted" or is_test:
self._test_suites[full_name] = (full_name, current, parent, is_test)
_old_service_messages.message(self, messageName, **properties)
def _test_to_list(self, test_name):
"""
Splits test name to parts to use it as list.
It most cases dot is used, but runner may provide custom function
"""
parts = test_name.split(".")
result = []
for part in parts:
result += _parse_parametrized(part)
return result
def _fix_setup_teardown_name(self, test_name):
"""
Hack to rename setup and teardown methods to much real python signatures
"""
try:
return {"test setup": "setUpClass", "test teardown": "tearDownClass"}[test_name]
except KeyError:
return test_name
# Blocks are used for 2 cases now:
# 1) Unittest subtests (only closed, opened by subTestBlockOpened)
# 2) setup/teardown (does not work, see https://github.com/JetBrains/teamcity-messages/issues/114)
# def blockOpened(self, name, flowId=None):
# self.testStarted(".".join(TREE_MANAGER.current_branch + [self._fix_setup_teardown_name(name)]))
def blockClosed(self, name, flowId=None):
# If _latest_subtest_result is not set or does not exist we closing setup method, not a subtest
try:
if not self._latest_subtest_result:
return
except AttributeError:
return
# closing subtest
test_name = ".".join(_TREE_MANAGER_HOLDER.manager.current_branch)
if self._latest_subtest_result in {"Failure", "Error"}:
self.testFailed(test_name)
if self._latest_subtest_result == "Skip":
self.testIgnored(test_name)
self.testFinished(test_name)
self._latest_subtest_result = None
def subTestBlockOpened(self, name, subTestResult, flowId=None):
self.testStarted(".".join(_TREE_MANAGER_HOLDER.manager.current_branch + [name]))
self._latest_subtest_result = subTestResult
def testStarted(self, testName, captureStandardOutput=None, flowId=None, is_suite=False, metainfo=None):
test_name_as_list = self._test_to_list(testName)
testName = ".".join(test_name_as_list)
def _write_start_message():
# testName, captureStandardOutput, flowId
args = {"name": testName, "captureStandardOutput": captureStandardOutput, "metainfo": metainfo}
if is_suite:
self.message("testSuiteStarted", **args)
else:
self.message("testStarted", **args)
commands = _TREE_MANAGER_HOLDER.manager.level_opened(self._test_to_list(testName), _write_start_message)
if commands:
self.do_commands(commands)
self.testStarted(testName, captureStandardOutput, metainfo=metainfo)
def testFailed(self, testName, message='', details='', flowId=None, comparison_failure=None):
testName = ".".join(self._test_to_list(testName))
_old_service_messages.testFailed(self, testName, message, details, comparison_failure=comparison_failure)
def testFinished(self, testName, testDuration=None, flowId=None, is_suite=False):
test_parts = self._test_to_list(testName)
testName = ".".join(test_parts)
def _write_finished_message():
# testName, captureStandardOutput, flowId
current, parent = _TREE_MANAGER_HOLDER.manager.get_node_ids(testName)
if not current and not parent:
return
args = {"nodeId": current, "parentNodeId": parent, "name": testName}
# TODO: Doc copy/paste with parent, extract
if testDuration is not None:
duration_ms = testDuration.days * 86400000 + \
testDuration.seconds * 1000 + \
int(testDuration.microseconds / 1000)
args["duration"] = str(duration_ms)
if is_suite:
del self._test_suites[testName]
if is_parallel_mode():
del args["duration"]
self.message("testSuiteFinished", **args)
else:
self.message("testFinished", **args)
del self._test_suites[testName]
commands = _TREE_MANAGER_HOLDER.manager.level_closed(
self._test_to_list(testName), _write_finished_message)
if commands:
self.do_commands(commands)
self.testFinished(testName, testDuration)
def do_commands(self, commands):
"""
Executes commands, returned by level_closed and level_opened
"""
for command, test in commands:
test_name = ".".join(test)
# By executing commands we open or close suites(branches) since tests(leaves) are always reported by runner
if command == "open":
self.testStarted(test_name, is_suite=True)
else:
self.testFinished(test_name, is_suite=True)
def _repose_suite_closed(self, suite):
name = suite.full_name
if name:
_old_service_messages.testSuiteFinished(self, ".".join(name))
for child in suite.children.values():
self._repose_suite_closed(child)
def close_suites(self):
# Go in reverse order and close all suites
for (test_suite, node_id, parent_node_id, is_test) in \
reversed(list(self._test_suites.values())):
# suits are explicitly closed, but if test can't been finished, it is skipped
message = "testIgnored" if is_test else "testSuiteFinished"
_old_service_messages.message(self, message, **{
"name": test_suite,
"nodeId": str(node_id),
"parentNodeId": str(parent_node_id)
})
self._test_suites = OrderedDict()
messages.TeamcityServiceMessages = NewTeamcityServiceMessages
# Monkeypatched
def jb_patch_targets(targets, fs_glue, old_python_glue, new_python_glue, fs_to_python_glue, python_parts_action=None):
"""
Converts python targets format provided by Java to python-specific format
:param targets: list of separated by [old_python_glue] or dots targets
:param fs_glue: how to glue fs parts of target. I.e.: module "eggs" in "spam" package is "spam[fs_glue]eggs"
:param new_python_glue: how to glue python parts (glue between class and function etc)
:param old_python_glue: which symbols need to be replaced by [new_python_glue]
:param fs_to_python_glue: between last fs-part and first python part
:param python_parts_action: additional action for python parts
:return: list of targets with patched separators
"""
if not targets:
return []
def _patch_target(target):
# /path/foo.py::parts.to.python
match = re.match("^(:?(.+)[.]py::)?([^\\[]+)(.*)$", target)
assert match, "unexpected string: {0}".format(target)
fs_part = match.group(2)
python_part = match.group(3).replace(old_python_glue, new_python_glue) + match.group(4)
if python_parts_action is not None:
python_part = python_parts_action(fs_part, python_part)
if fs_part:
return fs_part.replace("/", fs_glue) + fs_to_python_glue + python_part
else:
return python_part
return map(_patch_target, targets)
def jb_patch_separator(targets, fs_glue, python_glue, fs_to_python_glue):
"""
Converts python target if format "/path/foo.py::parts.to.python" provided by Java to
python-specific format
:param targets: list of dot-separated targets
:param fs_glue: how to glue fs parts of target. I.e.: module "eggs" in "spam" package is "spam[fs_glue]eggs"
:param python_glue: how to glue python parts (glue between class and function etc)
:param fs_to_python_glue: between last fs-part and first python part
:return: list of targets with patched separators
"""
return jb_patch_targets(targets, fs_glue, '.', python_glue, fs_to_python_glue)
def jb_start_tests():
"""
Parses arguments, starts protocol and fixes syspath and returns tuple of arguments
"""
path, targets, additional_args = parse_arguments()
start_protocol()
return path, targets, additional_args
def jb_finish_tests():
# To be called before process exist to close all suites
instance = NewTeamcityServiceMessages.INSTANCE
# instance may not be set if you run like pytest --version
if instance:
instance.close_suites()
def start_protocol():
properties = {"durationStrategy": "manual"} if is_parallel_mode() else dict()
NewTeamcityServiceMessages().message('enteredTheMatrix', **properties)
def parse_arguments():
"""
Parses arguments, fixes syspath and returns tuple of arguments
:return: (string with path or None, list of targets or None, list of additional arguments)
It may return list with only one element (name itself) if name is the same or split names to several parts
"""
# Handle additional args after --
additional_args = []
try:
index = sys.argv.index("--")
additional_args = sys.argv[index + 1:]
del sys.argv[index:]
except ValueError:
pass
utils = _jb_utils.VersionAgnosticUtils()
namespace = utils.get_options(
_jb_utils.OptionDescription('--path', 'Path to file or folder to run'),
_jb_utils.OptionDescription('--offset', 'Root node offset'),
_jb_utils.OptionDescription('--target', 'Python target to run', "append"))
del sys.argv[1:] # Remove all args
# PyCharm helpers dir is first dir in sys.path because helper is launched.
# But sys.path should be same as when launched with test runner directly
try:
if os.path.abspath(sys.path[0]) == os.path.abspath(
os.environ["PYCHARM_HELPERS_DIR"]):
path = sys.path.pop(0)
if path not in sys.path:
sys.path.append(path)
except KeyError:
pass
_TREE_MANAGER_HOLDER.offset = int(namespace.offset if namespace.offset else 0)
return namespace.path, namespace.target, additional_args
def jb_doc_args(framework_name, args):
"""
Runner encouraged to report its arguments to user with aid of this function
"""
print("Launching {0} with arguments {1} in {2}\n".format(framework_name,
" ".join(args),
PROJECT_DIR))