Source code for robot.running.context

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import sys
import inspect
import asyncio
from contextlib import contextmanager

from robot.errors import DataError


[docs]class Asynchronous: def __init__(self): self._loop_ref = None @property def event_loop(self): if self._loop_ref is None: self._loop_ref = asyncio.new_event_loop() return self._loop_ref
[docs] def close_loop(self): if self._loop_ref: self._loop_ref.close()
[docs] def run_until_complete(self, coroutine): return self.event_loop.run_until_complete(coroutine)
[docs] def is_loop_required(self, obj): return inspect.iscoroutine(obj) and not self._is_loop_running()
def _is_loop_running(self): # ensure 3.6 compatibility if sys.version_info.minor == 6: return asyncio._get_running_loop() is not None try: asyncio.get_running_loop() except RuntimeError: return False else: return True
[docs]class ExecutionContexts: def __init__(self): self._contexts = [] self._asynchronous = Asynchronous() @property def current(self): return self._contexts[-1] if self._contexts else None @property def top(self): return self._contexts[0] if self._contexts else None def __iter__(self): return iter(self._contexts) @property def namespaces(self): return (context.namespace for context in self)
[docs] def start_suite(self, suite, namespace, output, dry_run=False): ctx = _ExecutionContext(suite, namespace, output, dry_run, self._asynchronous) self._contexts.append(ctx) return ctx
[docs] def end_suite(self): self._contexts.pop() if not self._contexts: self._asynchronous.close_loop()
# This is ugly but currently needed e.g. by BuiltIn EXECUTION_CONTEXTS = ExecutionContexts() class _ExecutionContext: _started_keywords_threshold = 100 def __init__(self, suite, namespace, output, dry_run=False, asynchronous=None): self.suite = suite self.test = None self.timeouts = set() self.namespace = namespace self.output = output self.dry_run = dry_run self.in_suite_teardown = False self.in_test_teardown = False self.in_keyword_teardown = 0 self.timeout_occurred = False self.steps = [] self.user_keywords = [] self.asynchronous = asynchronous @contextmanager def suite_teardown(self): self.in_suite_teardown = True try: yield finally: self.in_suite_teardown = False @contextmanager def test_teardown(self, test): self.variables.set_test('${TEST_STATUS}', test.status) self.variables.set_test('${TEST_MESSAGE}', test.message) self.in_test_teardown = True self._remove_timeout(test.timeout) try: yield finally: self.in_test_teardown = False @contextmanager def keyword_teardown(self, error): self.variables.set_keyword('${KEYWORD_STATUS}', 'FAIL' if error else 'PASS') self.variables.set_keyword('${KEYWORD_MESSAGE}', str(error or '')) self.in_keyword_teardown += 1 try: yield finally: self.in_keyword_teardown -= 1 @contextmanager def user_keyword(self, handler): self.user_keywords.append(handler) self.namespace.start_user_keyword() try: yield finally: self.namespace.end_user_keyword() self.user_keywords.pop() def warn_on_invalid_private_call(self, handler): parent = self.user_keywords[-1] if self.user_keywords else None if not parent or parent.source != handler.source: self.warn(f"Keyword '{handler.longname}' is private and should only " f"be called by keywords in the same file.") @contextmanager def timeout(self, timeout): self._add_timeout(timeout) try: yield finally: self._remove_timeout(timeout) @property def in_teardown(self): return bool(self.in_suite_teardown or self.in_test_teardown or self.in_keyword_teardown) @property def variables(self): return self.namespace.variables def continue_on_failure(self, default=False): parents = ([self.test] if self.test else []) + self.user_keywords for index, parent in enumerate(reversed(parents)): if (parent.tags.robot('recursive-stop-on-failure') or index == 0 and parent.tags.robot('stop-on-failure')): return False if (parent.tags.robot('recursive-continue-on-failure') or index == 0 and parent.tags.robot('continue-on-failure')): return True return default or self.in_teardown @property def allow_loop_control(self): for step in reversed(self.steps): if step.type == 'ITERATION': return True if step.type == 'KEYWORD' and step.libname != 'BuiltIn': return False return False def end_suite(self, suite): for name in ['${PREV_TEST_NAME}', '${PREV_TEST_STATUS}', '${PREV_TEST_MESSAGE}']: self.variables.set_global(name, self.variables[name]) self.output.end_suite(suite) self.namespace.end_suite(suite) EXECUTION_CONTEXTS.end_suite() def set_suite_variables(self, suite): self.variables['${SUITE_NAME}'] = suite.longname self.variables['${SUITE_SOURCE}'] = str(suite.source or '') self.variables['${SUITE_DOCUMENTATION}'] = suite.doc self.variables['${SUITE_METADATA}'] = suite.metadata.copy() def report_suite_status(self, status, message): self.variables['${SUITE_STATUS}'] = status self.variables['${SUITE_MESSAGE}'] = message def start_test(self, test): self.test = test self._add_timeout(test.timeout) self.namespace.start_test() self.variables.set_test('${TEST_NAME}', test.name) self.variables.set_test('${TEST_DOCUMENTATION}', test.doc) self.variables.set_test('@{TEST_TAGS}', list(test.tags)) def _add_timeout(self, timeout): if timeout: timeout.start() self.timeouts.add(timeout) def _remove_timeout(self, timeout): if timeout in self.timeouts: self.timeouts.remove(timeout) def end_test(self, test): self.test = None self._remove_timeout(test.timeout) self.namespace.end_test() self.variables.set_suite('${PREV_TEST_NAME}', test.name) self.variables.set_suite('${PREV_TEST_STATUS}', test.status) self.variables.set_suite('${PREV_TEST_MESSAGE}', test.message) self.timeout_occurred = False def start_keyword(self, keyword): self.steps.append(keyword) if len(self.steps) > self._started_keywords_threshold: raise DataError('Maximum limit of started keywords and control ' 'structures exceeded.') self.output.start_keyword(keyword) def end_keyword(self, keyword): self.output.end_keyword(keyword) self.steps.pop() def get_runner(self, name): return self.namespace.get_runner(name) def trace(self, message): self.output.trace(message) def debug(self, message): self.output.debug(message) def info(self, message): self.output.info(message) def warn(self, message): self.output.warn(message) def fail(self, message): self.output.fail(message) def skip(self, message): self.output.skip(message)