Source code for robot.running.runner

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from robot.errors import ExecutionStatus, DataError, PassExecution
from robot.model import SuiteVisitor
from robot.result import TestSuite, Result
from robot.utils import get_timestamp, is_list_like, NormalizedDict, unic
from robot.variables import VariableScopes

from .context import EXECUTION_CONTEXTS
from .steprunner import StepRunner
from .namespace import Namespace
from .status import SuiteStatus, TestStatus
from .timeouts import TestTimeout


# Some 'extract method' love needed here. Perhaps even 'extract class'.

[docs]class Runner(SuiteVisitor): def __init__(self, output, settings): self.result = None self._output = output self._settings = settings self._variables = VariableScopes(settings) self._suite = None self._suite_status = None self._executed_tests = None @property def _context(self): return EXECUTION_CONTEXTS.current
[docs] def start_suite(self, suite): self._output.library_listeners.new_suite_scope() result = TestSuite(source=suite.source, name=suite.name, doc=suite.doc, metadata=suite.metadata, starttime=get_timestamp(), rpa=self._settings.rpa) if not self.result: result.set_criticality(self._settings.critical_tags, self._settings.non_critical_tags) self.result = Result(root_suite=result, rpa=self._settings.rpa) self.result.configure(status_rc=self._settings.status_rc, stat_config=self._settings.statistics_config) else: self._suite.suites.append(result) self._suite = result self._suite_status = SuiteStatus(self._suite_status, self._settings.exit_on_failure, self._settings.exit_on_error, self._settings.skip_teardown_on_exit) ns = Namespace(self._variables, result, suite.resource) ns.start_suite() ns.variables.set_from_variable_table(suite.resource.variables) EXECUTION_CONTEXTS.start_suite(result, ns, self._output, self._settings.dry_run) self._context.set_suite_variables(result) if not self._suite_status.failures: ns.handle_imports() ns.variables.resolve_delayed() result.doc = self._resolve_setting(result.doc) result.metadata = [(self._resolve_setting(n), self._resolve_setting(v)) for n, v in result.metadata.items()] self._context.set_suite_variables(result) self._output.start_suite(ModelCombiner(suite, result, tests=suite.tests, suites=suite.suites, test_count=suite.test_count)) self._output.register_error_listener(self._suite_status.error_occurred) self._run_setup(suite.keywords.setup, self._suite_status) self._executed_tests = NormalizedDict(ignore='_')
def _resolve_setting(self, value): if is_list_like(value): return self._variables.replace_list(value, ignore_errors=True) return self._variables.replace_string(value, ignore_errors=True)
[docs] def end_suite(self, suite): self._suite.message = self._suite_status.message self._context.report_suite_status(self._suite.status, self._suite.full_message) with self._context.suite_teardown(): failure = self._run_teardown(suite.keywords.teardown, self._suite_status) if failure: self._suite.suite_teardown_failed(unic(failure)) if self._suite.statistics.critical.failed: self._suite_status.critical_failure_occurred() self._suite.endtime = get_timestamp() self._suite.message = self._suite_status.message self._context.end_suite(ModelCombiner(suite, self._suite)) self._suite = self._suite.parent self._suite_status = self._suite_status.parent self._output.library_listeners.discard_suite_scope()
[docs] def visit_test(self, test): if test.name in self._executed_tests: self._output.warn("Multiple test cases with name '%s' executed in " "test suite '%s'." % (test.name, self._suite.longname)) self._executed_tests[test.name] = True result = self._suite.tests.create(name=self._resolve_setting(test.name), doc=self._resolve_setting(test.doc), tags=self._resolve_setting(test.tags), starttime=get_timestamp(), timeout=self._get_timeout(test)) self._context.start_test(result) self._output.start_test(ModelCombiner(test, result)) status = TestStatus(self._suite_status, result) if status.exit: self._add_exit_combine() result.tags.add('robot:exit') if not status.failures and not test.name: status.test_failed('Test case name cannot be empty.') if not status.failures and not test.keywords.normal: status.test_failed('Test case contains no keywords.') self._run_setup(test.keywords.setup, status, result) try: if not status.failures: StepRunner(self._context, test.template).run_steps(test.keywords.normal) else: status.test_failed(status.message) except PassExecution as exception: err = exception.earlier_failures if err: status.test_failed(err) else: result.message = exception.message except ExecutionStatus as err: status.test_failed(err) result.status = status.status result.message = status.message or result.message if status.teardown_allowed: with self._context.test_teardown(result): failure = self._run_teardown(test.keywords.teardown, status, result) if failure and result.critical: status.critical_failure_occurred() if not status.failures and result.timeout and result.timeout.timed_out(): status.test_failed(result.timeout.get_message()) result.message = status.message result.status = status.status result.endtime = get_timestamp() self._output.end_test(ModelCombiner(test, result)) self._context.end_test(result)
def _add_exit_combine(self): exit_combine = ('NOT robot:exit', '') if exit_combine not in self._settings['TagStatCombine']: self._settings['TagStatCombine'].append(exit_combine) def _get_timeout(self, test): if not test.timeout: return None return TestTimeout(test.timeout, self._variables, rpa=test.parent.rpa) def _run_setup(self, setup, status, result=None): if not status.failures: exception = self._run_setup_or_teardown(setup) status.setup_executed(exception) if result and isinstance(exception, PassExecution): result.message = exception.message def _run_teardown(self, teardown, status, result=None): if status.teardown_allowed: exception = self._run_setup_or_teardown(teardown) status.teardown_executed(exception) failed = not isinstance(exception, PassExecution) if result and exception: result.message = status.message if failed else exception.message return exception if failed else None def _run_setup_or_teardown(self, data): if not data: return None try: name = self._variables.replace_string(data.name) except DataError as err: if self._settings.dry_run: return None return err if name.upper() in ('', 'NONE'): return None try: StepRunner(self._context).run_step(data, name=name) except ExecutionStatus as err: return err
[docs]class ModelCombiner(object): def __init__(self, data, result, **priority): self.data = data self.result = result self.priority = priority def __getattr__(self, name): if name in self.priority: return self.priority[name] if hasattr(self.result, name): return getattr(self.result, name) if hasattr(self.data, name): return getattr(self.data, name) raise AttributeError(name)