Source code for robot.running.suiterunner

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from datetime import datetime

from robot.errors import ExecutionStatus, PassExecution
from robot.model import SuiteVisitor, TagPatterns
from robot.result import (
    Keyword as KeywordResult, Result, TestCase as TestResult, TestSuite as SuiteResult
)
from robot.utils import (
    is_list_like, NormalizedDict, plural_or_not as s, seq2str, test_or_task
)
from robot.variables import VariableScopes

from .bodyrunner import BodyRunner, KeywordRunner
from .context import EXECUTION_CONTEXTS
from .model import Keyword as KeywordData, TestCase as TestData, TestSuite as SuiteData
from .namespace import Namespace
from .status import SuiteStatus, TestStatus
from .timeouts import TestTimeout


[docs] class SuiteRunner(SuiteVisitor): def __init__(self, output, settings): self.result = None self.output = output self.settings = settings self.variables = VariableScopes(settings) self.suite_result = None self.suite_status = None self.executed = [NormalizedDict(ignore="_")] self.skipped_tags = TagPatterns(settings.skip) @property def context(self): return EXECUTION_CONTEXTS.current
[docs] def start_suite(self, data: SuiteData): if data.name in self.executed[-1] and data.parent.source: self.output.warn( f"Multiple suites with name '{data.name}' executed in " f"suite '{data.parent.full_name}'." ) self.executed[-1][data.name] = True self.executed.append(NormalizedDict(ignore="_")) self.output.library_listeners.new_suite_scope() result = SuiteResult( source=data.source, name=data.name, doc=data.doc, metadata=data.metadata, start_time=datetime.now(), rpa=self.settings.rpa, ) if not self.result: self.result = Result(suite=result, rpa=self.settings.rpa) self.result.configure( status_rc=self.settings.status_rc, stat_config=self.settings.statistics_config, ) else: self.suite_result.suites.append(result) self.suite_result = result self.suite_status = SuiteStatus( self.suite_status, self.settings.exit_on_failure, self.settings.exit_on_error, self.settings.skip_teardown_on_exit, ) ns = Namespace(self.variables, result, data.resource, self.settings.languages) ns.start_suite() ns.variables.set_from_variable_section(data.resource.variables) EXECUTION_CONTEXTS.start_suite(result, ns, self.output, self.settings.dry_run) self.context.set_suite_variables(result) if not self.suite_status.failed: ns.handle_imports() ns.variables.resolve_delayed() result.doc = self._resolve_setting(result.doc) result.metadata = [ (self._resolve_setting(n), self._resolve_setting(v)) for n, v in result.metadata.items() ] self.context.set_suite_variables(result) self.output.start_suite(data, result) self.output.register_error_listener(self.suite_status.error_occurred) self._run_setup( data, self.suite_status, self.suite_result, run=self._any_test_run(data), )
def _any_test_run(self, suite: SuiteData): skipped_tags = self.skipped_tags for test in suite.all_tests: tags = test.tags if not ( skipped_tags.match(tags) or tags.robot("skip") or tags.robot("exclude") ): # fmt: skip return True return False def _resolve_setting(self, value): if is_list_like(value): return self.variables.replace_list(value, ignore_errors=True) return self.variables.replace_string(value, ignore_errors=True)
[docs] def end_suite(self, suite: SuiteData): self.suite_result.message = self.suite_status.message self.context.report_suite_status( self.suite_result.status, self.suite_result.full_message ) with self.context.suite_teardown(): failure = self._run_teardown(suite, self.suite_status, self.suite_result) if failure: if failure.skip: self.suite_result.suite_teardown_skipped(str(failure)) else: self.suite_result.suite_teardown_failed(str(failure)) self.suite_result.end_time = datetime.now() self.suite_result.message = self.suite_status.message self.context.end_suite(suite, self.suite_result) self._clear_result(self.suite_result) self.executed.pop() self.suite_result = self.suite_result.parent self.suite_status = self.suite_status.parent self.output.library_listeners.discard_suite_scope()
[docs] def visit_test(self, data: TestData): settings = self.settings result = self.suite_result.tests.create( self._resolve_setting(data.name), self._resolve_setting(data.doc), self._resolve_setting(data.tags), self._get_timeout(data), data.lineno, start_time=datetime.now(), ) if result.tags.robot("exclude"): self.suite_result.tests.pop() return if result.name in self.executed[-1]: self.output.warn( test_or_task( f"Multiple {{test}}s with name '{result.name}' executed " f"in suite '{result.parent.full_name}'.", settings.rpa, ) ) self.executed[-1][result.name] = True self.context.start_test(data, result) status = TestStatus( self.suite_status, result, settings.skip_on_failure, settings.rpa, ) if status.exit: self._add_exit_combine() result.tags.add("robot:exit") if status.passed: if not data.error: if not data.name: data.error = "Test name cannot be empty." elif not data.body: data.error = "Test cannot be empty." if data.error: if settings.rpa: data.error = data.error.replace("Test", "Task") status.test_failed(data.error) elif result.tags.robot("skip"): status.test_skipped( self._get_skipped_message(["robot:skip"], settings.rpa) ) elif self.skipped_tags.match(result.tags): status.test_skipped( self._get_skipped_message(self.skipped_tags, settings.rpa) ) self._run_setup(data, status, result) if status.passed: runner = BodyRunner(self.context, templated=bool(data.template)) try: runner.run(data, result) except PassExecution as exception: err = exception.earlier_failures if err: status.test_failed(error=err) else: result.message = exception.message except ExecutionStatus as err: status.test_failed(error=err) elif status.skipped: status.test_skipped(status.message) else: status.test_failed(status.message) result.status = status.status result.message = status.message or result.message with self.context.test_teardown(result): self._run_teardown(data, status, result) if status.passed and result.timeout and result.timeout.timed_out(): status.test_failed(result.timeout.get_message()) result.message = status.message if status.skip_on_failure_after_tag_changes: result.message = status.message or result.message result.status = status.status result.end_time = datetime.now() failed_before_listeners = result.failed # TODO: can this be removed to context self.output.end_test(data, result) if result.failed and not failed_before_listeners: status.failure_occurred() self.context.end_test(result) self._clear_result(result)
def _get_skipped_message(self, tags, rpa): kind = "tag" if getattr(tags, "is_constant", True) else "tag pattern" return test_or_task( f"{{Test}} skipped using {seq2str(tags)} {kind}{s(tags)}.", rpa ) def _clear_result(self, result: "SuiteResult|TestResult"): if result.has_setup: result.setup = None if result.has_teardown: result.teardown = None if hasattr(result, "body"): result.body.clear() def _add_exit_combine(self): exit_combine = ("NOT robot:exit", "") if exit_combine not in self.settings["TagStatCombine"]: self.settings["TagStatCombine"].append(exit_combine) def _get_timeout(self, test: TestData): if not test.timeout: return None return TestTimeout(test.timeout, self.variables, rpa=test.parent.rpa) def _run_setup( self, item: "SuiteData|TestData", status: "SuiteStatus|TestStatus", result: "SuiteResult|TestResult", run: bool = True, ): if run and status.passed: if item.has_setup: exception = self._run_setup_or_teardown(item.setup, result.setup) else: exception = None status.setup_executed(exception) if isinstance(exception, PassExecution) and isinstance(result, TestResult): result.message = exception.message elif status.parent and status.parent.skipped: status.skipped = True def _run_teardown( self, item: "SuiteData|TestData", status: "SuiteStatus|TestStatus", result: "SuiteResult|TestResult", ): if not status.teardown_allowed: return None if item.has_teardown: exception = self._run_setup_or_teardown(item.teardown, result.teardown) else: exception = None status.teardown_executed(exception) failed = exception and not isinstance(exception, PassExecution) if isinstance(result, TestResult) and exception: if failed or status.skipped or exception.skip: result.message = status.message else: # Pass execution has been used in teardown, # and it overrides previous failure message result.message = exception.message return exception if failed else None def _run_setup_or_teardown(self, data: KeywordData, result: KeywordResult): try: KeywordRunner(self.context).run(data, result, setup_or_teardown=True) except ExecutionStatus as err: return err