Source code for robot.running.suiterunner

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from datetime import datetime

from robot.errors import ExecutionFailed, ExecutionStatus, DataError, PassExecution
from robot.model import SuiteVisitor, TagPatterns
from robot.result import (Keyword as KeywordResult, TestCase as TestResult,
                          TestSuite as SuiteResult, Result)
from robot.utils import is_list_like, NormalizedDict, test_or_task
from robot.variables import VariableScopes

from .bodyrunner import BodyRunner, KeywordRunner
from .context import EXECUTION_CONTEXTS
from .model import Keyword as KeywordData, TestCase as TestData, TestSuite as SuiteData
from .namespace import Namespace
from .status import SuiteStatus, TestStatus
from .timeouts import TestTimeout


[docs] class SuiteRunner(SuiteVisitor): def __init__(self, output, settings): self.result = None self.output = output self.settings = settings self.variables = VariableScopes(settings) self.suite_result = None self.suite_status = None self.executed = [NormalizedDict(ignore='_')] self.skipped_tags = TagPatterns(settings.skip) @property def context(self): return EXECUTION_CONTEXTS.current
[docs] def start_suite(self, data: SuiteData): if data.name in self.executed[-1] and data.parent.source: self.output.warn(f"Multiple suites with name '{data.name}' executed in " f"suite '{data.parent.full_name}'.") self.executed[-1][data.name] = True self.executed.append(NormalizedDict(ignore='_')) self.output.library_listeners.new_suite_scope() result = SuiteResult(source=data.source, name=data.name, doc=data.doc, metadata=data.metadata, start_time=datetime.now(), rpa=self.settings.rpa) if not self.result: self.result = Result(root_suite=result, rpa=self.settings.rpa) self.result.configure(status_rc=self.settings.status_rc, stat_config=self.settings.statistics_config) else: self.suite_result.suites.append(result) self.suite_result = result self.suite_status = SuiteStatus(self.suite_status, self.settings.exit_on_failure, self.settings.exit_on_error, self.settings.skip_teardown_on_exit) ns = Namespace(self.variables, result, data.resource, self.settings.languages) ns.start_suite() ns.variables.set_from_variable_section(data.resource.variables) EXECUTION_CONTEXTS.start_suite(result, ns, self.output, self.settings.dry_run) self.context.set_suite_variables(result) if not self.suite_status.failed: ns.handle_imports() ns.variables.resolve_delayed() result.doc = self._resolve_setting(result.doc) result.metadata = [(self._resolve_setting(n), self._resolve_setting(v)) for n, v in result.metadata.items()] self.context.set_suite_variables(result) self.output.start_suite(data, result) self.output.register_error_listener(self.suite_status.error_occurred) self._run_setup(data, self.suite_status, self.suite_result, run=self._any_test_run(data))
def _any_test_run(self, suite: SuiteData): skipped_tags = self.skipped_tags for test in suite.all_tests: tags = test.tags if not (skipped_tags.match(tags) or tags.robot('skip') or tags.robot('exclude')): return True return False def _resolve_setting(self, value): if is_list_like(value): return self.variables.replace_list(value, ignore_errors=True) return self.variables.replace_string(value, ignore_errors=True)
[docs] def end_suite(self, suite: SuiteData): self.suite_result.message = self.suite_status.message self.context.report_suite_status(self.suite_result.status, self.suite_result.full_message) with self.context.suite_teardown(): failure = self._run_teardown(suite, self.suite_status, self.suite_result) if failure: if failure.skip: self.suite_result.suite_teardown_skipped(str(failure)) else: self.suite_result.suite_teardown_failed(str(failure)) self.suite_result.end_time = datetime.now() self.suite_result.message = self.suite_status.message self.context.end_suite(suite, self.suite_result) self._clear_result(self.suite_result) self.executed.pop() self.suite_result = self.suite_result.parent self.suite_status = self.suite_status.parent self.output.library_listeners.discard_suite_scope()
[docs] def visit_test(self, data: TestData): settings = self.settings if data.tags.robot('exclude'): return if data.name in self.executed[-1]: self.output.warn( test_or_task(f"Multiple {{test}}s with name '{data.name}' executed in " f"suite '{data.parent.full_name}'.", settings.rpa)) self.executed[-1][data.name] = True result = self.suite_result.tests.create(self._resolve_setting(data.name), self._resolve_setting(data.doc), self._resolve_setting(data.tags), self._get_timeout(data), data.lineno, start_time=datetime.now()) self.context.start_test(data, result) status = TestStatus(self.suite_status, result, settings.skip_on_failure, settings.rpa) if status.exit: self._add_exit_combine() result.tags.add('robot:exit') if status.passed: if not data.error: if not data.name: data.error = 'Test name cannot be empty.' elif not data.body: data.error = 'Test cannot be empty.' if data.error: if settings.rpa: data.error = data.error.replace('Test', 'Task') status.test_failed(data.error) elif data.tags.robot('skip'): status.test_skipped( test_or_task("{Test} skipped using 'robot:skip' tag.", settings.rpa)) elif self.skipped_tags.match(data.tags): status.test_skipped( test_or_task("{Test} skipped using '--skip' command line option.", settings.rpa)) self._run_setup(data, status, result) if status.passed: runner = BodyRunner(self.context, templated=bool(data.template)) try: runner.run(data, result) except PassExecution as exception: err = exception.earlier_failures if err: status.test_failed(error=err) else: result.message = exception.message except ExecutionStatus as err: status.test_failed(error=err) elif status.skipped: status.test_skipped(status.message) else: status.test_failed(status.message) result.status = status.status result.message = status.message or result.message with self.context.test_teardown(result): self._run_teardown(data, status, result) if status.passed and result.timeout and result.timeout.timed_out(): status.test_failed(result.timeout.get_message()) result.message = status.message if status.skip_on_failure_after_tag_changes: result.message = status.message or result.message result.status = status.status result.end_time = datetime.now() failed_before_listeners = result.failed # TODO: can this be removed to context self.output.end_test(data, result) if result.failed and not failed_before_listeners: status.failure_occurred() self.context.end_test(result) self._clear_result(result)
def _clear_result(self, result: 'SuiteResult|TestResult'): if result.has_setup: result.setup = None if result.has_teardown: result.teardown = None if hasattr(result, 'body'): result.body.clear() def _add_exit_combine(self): exit_combine = ('NOT robot:exit', '') if exit_combine not in self.settings['TagStatCombine']: self.settings['TagStatCombine'].append(exit_combine) def _get_timeout(self, test: TestData): if not test.timeout: return None return TestTimeout(test.timeout, self.variables, rpa=test.parent.rpa) def _run_setup(self, item: 'SuiteData|TestData', status: 'SuiteStatus|TestStatus', result: 'SuiteResult|TestResult', run: bool = True): if run and status.passed: if item.has_setup: exception = self._run_setup_or_teardown(item.setup, result.setup) else: exception = None status.setup_executed(exception) if isinstance(exception, PassExecution) and isinstance(result, TestResult): result.message = exception.message elif status.parent and status.parent.skipped: status.skipped = True def _run_teardown(self, item: 'SuiteData|TestData', status: 'SuiteStatus|TestStatus', result: 'SuiteResult|TestResult'): if status.teardown_allowed: if item.has_teardown: exception = self._run_setup_or_teardown(item.teardown, result.teardown) else: exception = None status.teardown_executed(exception) failed = exception and not isinstance(exception, PassExecution) if isinstance(result, TestResult) and exception: if failed or status.skipped or exception.skip: result.message = status.message else: # Pass execution used in teardown, # and it overrides previous failure message result.message = exception.message return exception if failed else None def _run_setup_or_teardown(self, data: KeywordData, result: KeywordResult): try: name = self.variables.replace_string(data.name) except DataError as err: if self.settings.dry_run: return None return ExecutionFailed(message=err.message) if name.upper() in ('', 'NONE'): return None try: KeywordRunner(self.context).run(data, result, name=name) except ExecutionStatus as err: return err