mirror of
https://gitflic.ru/project/openide/openide.git
synced 2025-12-16 22:51:17 +07:00
(cherry picked from commit a423dc858b89cb8910f771fda7a3480d3a726ca5) IJ-MR-172503 GitOrigin-RevId: ec33e71fc5adc78be4f133a177903381c7e45099
337 lines
13 KiB
Python
337 lines
13 KiB
Python
# coding=utf-8
|
|
"""
|
|
Behave BDD runner.
|
|
See _bdd_utils#get_path_by_env for information how to pass list of features here.
|
|
Each feature could be file, folder with feature files or folder with "features" subfolder
|
|
|
|
Other args are tag expressionsin format (--tags=.. --tags=..).
|
|
See https://pythonhosted.org/behave/behave.html#tag-expression
|
|
"""
|
|
|
|
import functools
|
|
import glob
|
|
import re
|
|
import sys
|
|
import traceback
|
|
from behave.formatter.base import Formatter
|
|
from behave.model import Step, ScenarioOutline, Feature, Scenario
|
|
from behave.formatter import _registry
|
|
from behave.tag_expression import TagExpression
|
|
from _jb_django_behave import run_as_django_behave
|
|
import _bdd_utils
|
|
import tcmessages
|
|
from _jb_utils import VersionAgnosticUtils
|
|
|
|
_MAX_STEPS_SEARCH_FEATURES = 5000 # Do not look for features in folder that has more that this number of children
|
|
_FEATURES_FOLDER = 'features' # "features" folder name.
|
|
|
|
__author__ = 'Ilya.Kazakevich'
|
|
|
|
from behave import configuration, runner
|
|
|
|
import os
|
|
|
|
|
|
def _get_dirs_to_run(base_dir_to_search):
|
|
"""
|
|
Searches for "features" dirs in some base_dir
|
|
:return: list of feature dirs to run
|
|
:rtype: list
|
|
:param base_dir_to_search root directory to search (should not have too many children!)
|
|
:type base_dir_to_search str
|
|
|
|
"""
|
|
result = set()
|
|
for (step, (folder, sub_folders, files)) in enumerate(os.walk(base_dir_to_search)):
|
|
if os.path.basename(folder) == _FEATURES_FOLDER and os.path.isdir(folder):
|
|
result.add(os.path.abspath(folder))
|
|
if step == _MAX_STEPS_SEARCH_FEATURES: # Guard
|
|
err = "Folder {0} is too deep to find any features folder. Please provider concrete folder".format(
|
|
base_dir_to_search)
|
|
raise Exception(err)
|
|
return list(result)
|
|
|
|
|
|
def _merge_hooks_wrapper(*hooks):
|
|
"""
|
|
Creates wrapper that runs provided behave hooks sequentally
|
|
:param hooks: hooks to run
|
|
:return: wrapper
|
|
"""
|
|
# TODO: Wheel reinvented!!!!
|
|
def wrapper(*args, **kwargs):
|
|
for hook in hooks:
|
|
hook(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
|
|
class _RunnerWrapper(runner.Runner):
|
|
"""
|
|
Wrapper around behave native wrapper. Has nothing todo with BddRunner!
|
|
We need it to support dry runs (to fetch data from scenarios) and hooks api
|
|
"""
|
|
|
|
def __init__(self, config, hooks):
|
|
"""
|
|
:type config configuration.Configuration
|
|
:param config behave configuration
|
|
:type hooks dict or empty if new runner mode
|
|
:param hooks hooks in format "before_scenario" => f(context, scenario) to load after/before hooks, provided by user
|
|
"""
|
|
super(_RunnerWrapper, self).__init__(config)
|
|
self.dry_run = False
|
|
"""
|
|
Does not run tests (only fetches "self.features") if true. Runs tests otherwise.
|
|
"""
|
|
self.__hooks = hooks
|
|
|
|
def load_hooks(self, filename='environment.py'):
|
|
"""
|
|
Overrides parent "load_hooks" to add "self.__hooks"
|
|
:param filename: env. file name
|
|
"""
|
|
super(_RunnerWrapper, self).load_hooks(filename)
|
|
for (hook_name, hook) in self.__hooks.items():
|
|
hook_to_add = hook
|
|
if hook_name in self.hooks:
|
|
user_hook = self.hooks[hook_name]
|
|
if hook_name.startswith("before"):
|
|
user_and_custom_hook = [user_hook, hook]
|
|
else:
|
|
user_and_custom_hook = [hook, user_hook]
|
|
hook_to_add = _merge_hooks_wrapper(*user_and_custom_hook)
|
|
self.hooks[hook_name] = hook_to_add
|
|
|
|
def run_model(self, features=None):
|
|
"""
|
|
Overrides parent method to stop (do nothing) in case of "dry_run"
|
|
:param features: features to run
|
|
:return:
|
|
"""
|
|
if self.dry_run: # To stop further execution
|
|
return
|
|
return super(_RunnerWrapper, self).run_model(features)
|
|
|
|
def clean(self):
|
|
"""
|
|
Cleans runner after dry run (clears hooks, features etc). To be called before real run!
|
|
"""
|
|
self.dry_run = False
|
|
self.hooks.clear()
|
|
self.features = []
|
|
|
|
|
|
class _BehaveRunner(_bdd_utils.BddRunner):
|
|
"""
|
|
BddRunner for behave
|
|
"""
|
|
|
|
def __process_hook(self, is_started, context, element):
|
|
"""
|
|
Hook to be installed. Reports steps, features etc.
|
|
:param is_started true if test/feature/scenario is started
|
|
:type is_started bool
|
|
:param context behave context
|
|
:type context behave.runner.Context
|
|
:param element feature/suite/step
|
|
"""
|
|
element.location.file = element.location.filename # To preserve _bdd_utils contract
|
|
utils = VersionAgnosticUtils()
|
|
if isinstance(element, Step):
|
|
# Process step
|
|
step_name = u"{0} {1}".format(utils.to_unicode(element.keyword), utils.to_unicode(element.name))
|
|
duration_ms = element.duration * 1000
|
|
if is_started:
|
|
self._test_started(step_name, element.location)
|
|
elif element.status == 'passed':
|
|
self._test_passed(step_name, duration_ms)
|
|
elif element.status == 'failed':
|
|
# Correct way is to use element.errormessage
|
|
# but assertions do not have trace there (due to Behave internals)
|
|
# do, we collect it manually
|
|
error_message = element.error_message
|
|
fetch_log = not error_message # If no error_message provided, need to fetch log manually
|
|
trace = ""
|
|
if isinstance(element.exception, AssertionError) or not error_message:
|
|
trace = self._collect_trace(element, utils)
|
|
|
|
# May be empty https://github.com/behave/behave/issues/468 for some exceptions
|
|
if not trace and not error_message:
|
|
try:
|
|
error_message = traceback.format_exc()
|
|
except AttributeError:
|
|
# Exception may have empty stracktrace, and traceback.format_exc() throws
|
|
# AttributeError in this case
|
|
trace = self._collect_trace(element, utils)
|
|
if not error_message:
|
|
# Format exception as last resort
|
|
error_message = element.exception
|
|
message_as_string = utils.to_unicode(error_message)
|
|
if fetch_log and self.__real_runner.config.log_capture:
|
|
try:
|
|
capture = self.__real_runner.log_capture # 1.2.5
|
|
except AttributeError:
|
|
capture = self.__real_runner.capture_controller.log_capture # 1.2.6
|
|
|
|
message_as_string += u"\n" + utils.to_unicode(capture.getvalue())
|
|
self._test_failed(step_name, message_as_string, trace, duration=duration_ms)
|
|
elif element.status == 'undefined':
|
|
self._test_undefined(step_name, element.location)
|
|
else:
|
|
self._test_skipped(step_name, element.status, element.location)
|
|
elif not is_started and isinstance(element, Scenario) and element.status == 'failed':
|
|
# To process scenarios with undefined/skipped tests
|
|
for step in element.steps:
|
|
assert isinstance(step, Step), step
|
|
if step.status not in ['passed', 'failed']: # Something strange, probably skipped or undefined
|
|
self.__process_hook(False, context, step)
|
|
self._feature_or_scenario(is_started, element.name, element.location)
|
|
elif isinstance(element, ScenarioOutline):
|
|
self._feature_or_scenario(is_started, str(element.examples), element.location)
|
|
else:
|
|
self._feature_or_scenario(is_started, element.name, element.location)
|
|
|
|
def _collect_trace(self, element, utils):
|
|
return u"".join([utils.to_unicode(l) for l in traceback.format_tb(element.exc_traceback)])
|
|
|
|
def __init__(self, config, base_dir, use_old_runner):
|
|
"""
|
|
:type config configuration.Configuration
|
|
"""
|
|
super(_BehaveRunner, self).__init__(base_dir)
|
|
self.__config = config
|
|
# Install hooks
|
|
self.__real_runner = _RunnerWrapper(config, {
|
|
"before_feature": functools.partial(self.__process_hook, True),
|
|
"after_feature": functools.partial(self.__process_hook, False),
|
|
"before_scenario": functools.partial(self.__process_hook, True),
|
|
"after_scenario": functools.partial(self.__process_hook, False),
|
|
"before_step": functools.partial(self.__process_hook, True),
|
|
"after_step": functools.partial(self.__process_hook, False)
|
|
} if use_old_runner else dict())
|
|
|
|
def _run_tests(self):
|
|
self.__real_runner.run()
|
|
|
|
def __filter_scenarios_by_args(self, scenario):
|
|
"""
|
|
Filters out scenarios that should be skipped by tags or scenario names
|
|
:param scenario scenario to check
|
|
:return true if should pass
|
|
"""
|
|
assert isinstance(scenario, Scenario), scenario
|
|
expected_tags = self.__config.tags
|
|
scenario_name_re = self.__config.name_re
|
|
if scenario_name_re and not scenario_name_re.match(scenario.name):
|
|
return False
|
|
if not expected_tags:
|
|
return True # No tags nor names are required
|
|
return isinstance(expected_tags, TagExpression) and expected_tags.check(scenario.tags)
|
|
|
|
def _get_features_to_run(self):
|
|
self.__real_runner.dry_run = True
|
|
self.__real_runner.run()
|
|
features_to_run = self.__real_runner.features
|
|
self.__real_runner.clean() # To make sure nothing left after dry run
|
|
|
|
# Change outline scenario skeletons with real scenarios
|
|
for feature in features_to_run:
|
|
assert isinstance(feature, Feature), feature
|
|
scenarios = []
|
|
for scenario in feature.walk_scenarios():
|
|
try:
|
|
scenario.tags.extend(feature.tags)
|
|
except AttributeError:
|
|
pass
|
|
if isinstance(scenario, ScenarioOutline):
|
|
scenarios.extend(scenario.scenarios)
|
|
else:
|
|
scenarios.append(scenario)
|
|
feature.scenarios = filter(self.__filter_scenarios_by_args, scenarios)
|
|
|
|
return features_to_run
|
|
|
|
|
|
def _register_null_formatter(format_name):
|
|
class _Null(Formatter):
|
|
"""
|
|
Null formater to prevent stdout output
|
|
"""
|
|
pass
|
|
|
|
_registry.register_as(format_name, _Null)
|
|
|
|
|
|
def _register_teamcity_formatter(format_name, base_dir):
|
|
custom_messages = tcmessages.TeamcityServiceMessages()
|
|
|
|
# Not safe to import it in old mode
|
|
from teamcity.jb_behave_formatter import TeamcityFormatter
|
|
|
|
class TeamcityFormatterWithLocation(TeamcityFormatter):
|
|
|
|
def _report_suite_started(self, suite, suite_name):
|
|
location = suite.location
|
|
custom_messages.testSuiteStarted(
|
|
suite_name,
|
|
_bdd_utils.get_location(base_dir, location.filename, location.line)
|
|
)
|
|
|
|
def _report_test_started(self, test, test_name):
|
|
location = test.location
|
|
custom_messages.testStarted(
|
|
test_name,
|
|
_bdd_utils.get_location(base_dir, location.filename, location.line)
|
|
)
|
|
|
|
_registry.register_as(format_name, TeamcityFormatterWithLocation)
|
|
|
|
|
|
def main():
|
|
# TODO: support all other params instead
|
|
command_args = list(filter(None, sys.argv[1:]))
|
|
if command_args:
|
|
if "--junit" in command_args:
|
|
raise Exception("--junit report type for Behave is unsupported in PyCharm. \n "
|
|
"See: https://youtrack.jetbrains.com/issue/PY-14219")
|
|
_bdd_utils.fix_win_drive(command_args[0])
|
|
(base_dir, scenario_names, what_to_run) = _bdd_utils.get_what_to_run_by_env(os.environ)
|
|
|
|
for scenario_name in scenario_names:
|
|
command_args += ["-n", re.escape(scenario_name)] # TODO : rewite pythonic
|
|
|
|
my_config = configuration.Configuration(command_args=command_args)
|
|
|
|
# New version supports 1.2.6 only
|
|
use_old_runner = "PYCHARM_BEHAVE_OLD_RUNNER" in os.environ
|
|
|
|
format_name = "com.jetbrains.pycharm.formatter"
|
|
if use_old_runner:
|
|
_register_null_formatter(format_name)
|
|
else:
|
|
_register_teamcity_formatter(format_name, base_dir)
|
|
|
|
my_config.format = [format_name] # To prevent output to stdout
|
|
my_config.reporters = [] # To prevent summary to stdout
|
|
my_config.stdout_capture = False # For test output
|
|
my_config.stderr_capture = False # For test output
|
|
features = set()
|
|
for feature in what_to_run:
|
|
if os.path.isfile(feature) or glob.glob(
|
|
os.path.join(feature, "*.feature")): # File of folder with "features" provided, load it
|
|
features.add(feature)
|
|
elif os.path.isdir(feature):
|
|
features |= set(_get_dirs_to_run(feature)) # Find "features" subfolder
|
|
my_config.paths = list(features)
|
|
if what_to_run and not my_config.paths:
|
|
raise Exception("Nothing to run in {0}".format(what_to_run))
|
|
|
|
# Run as Django if supported, run plain otherwise
|
|
if not run_as_django_behave(format_name, what_to_run, command_args):
|
|
_BehaveRunner(my_config, base_dir, use_old_runner).run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|