# Copyright 2008-2015 Nokia Networks
# Copyright 2016- Robot Framework Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import inspect
import sys
from contextlib import contextmanager
from robot.errors import DataError, ExecutionFailed
[docs]
class Asynchronous:
def __init__(self):
self._loop_ref = None
@property
def event_loop(self):
if self._loop_ref is None:
self._loop_ref = asyncio.new_event_loop()
return self._loop_ref
[docs]
def close_loop(self):
if self._loop_ref:
self._loop_ref.close()
[docs]
def run_until_complete(self, coroutine):
task = self.event_loop.create_task(coroutine)
try:
return self.event_loop.run_until_complete(task)
except ExecutionFailed as e:
if e.dont_continue:
task.cancel()
# wait for task and its children to cancel
self.event_loop.run_until_complete(asyncio.gather(task, return_exceptions=True))
raise e
[docs]
def is_loop_required(self, obj):
return inspect.iscoroutine(obj) and not self._is_loop_running()
def _is_loop_running(self):
try:
asyncio.get_running_loop()
except RuntimeError:
return False
else:
return True
[docs]
class ExecutionContexts:
def __init__(self):
self._contexts = []
self._asynchronous = Asynchronous()
@property
def current(self):
return self._contexts[-1] if self._contexts else None
@property
def top(self):
return self._contexts[0] if self._contexts else None
def __iter__(self):
return iter(self._contexts)
@property
def namespaces(self):
return (context.namespace for context in self)
[docs]
def start_suite(self, suite, namespace, output, dry_run=False):
ctx = _ExecutionContext(suite, namespace, output, dry_run, self._asynchronous)
self._contexts.append(ctx)
return ctx
[docs]
def end_suite(self):
self._contexts.pop()
if not self._contexts:
self._asynchronous.close_loop()
# This is ugly but currently needed e.g. by BuiltIn
EXECUTION_CONTEXTS = ExecutionContexts()
class _ExecutionContext:
def __init__(self, suite, namespace, output, dry_run=False, asynchronous=None):
self.suite = suite
self.test = None
self.timeouts = set()
self.namespace = namespace
self.output = output
self.dry_run = dry_run
self.in_suite_teardown = False
self.in_test_teardown = False
self.in_keyword_teardown = 0
self.timeout_occurred = False
self.steps = []
self.user_keywords = []
self.asynchronous = asynchronous
@property
def languages(self):
return self.namespace.languages
@contextmanager
def suite_teardown(self):
self.in_suite_teardown = True
try:
yield
finally:
self.in_suite_teardown = False
@contextmanager
def test_teardown(self, test):
self.variables.set_test('${TEST_STATUS}', test.status)
self.variables.set_test('${TEST_MESSAGE}', test.message)
self.in_test_teardown = True
self._remove_timeout(test.timeout)
try:
yield
finally:
self.in_test_teardown = False
@contextmanager
def keyword_teardown(self, error):
self.variables.set_keyword('${KEYWORD_STATUS}', 'FAIL' if error else 'PASS')
self.variables.set_keyword('${KEYWORD_MESSAGE}', str(error or ''))
self.in_keyword_teardown += 1
try:
yield
finally:
self.in_keyword_teardown -= 1
@contextmanager
def user_keyword(self, handler):
self.user_keywords.append(handler)
self.namespace.start_user_keyword()
try:
yield
finally:
self.namespace.end_user_keyword()
self.user_keywords.pop()
def warn_on_invalid_private_call(self, handler):
parent = self.user_keywords[-1] if self.user_keywords else None
if not parent or parent.source != handler.source:
self.warn(f"Keyword '{handler.full_name}' is private and should only "
f"be called by keywords in the same file.")
@contextmanager
def timeout(self, timeout):
self._add_timeout(timeout)
try:
yield
finally:
self._remove_timeout(timeout)
@property
def in_teardown(self):
return bool(self.in_suite_teardown or
self.in_test_teardown or
self.in_keyword_teardown)
@property
def variables(self):
return self.namespace.variables
def continue_on_failure(self, default=False):
parents = [result for _, result, implementation in reversed(self.steps)
if implementation and implementation.type == 'USER KEYWORD']
if self.test:
parents.append(self.test)
for index, parent in enumerate(parents):
robot = parent.tags.robot
if index == 0 and robot('stop-on-failure'):
return False
if index == 0 and robot('continue-on-failure'):
return True
if robot('recursive-stop-on-failure'):
return False
if robot('recursive-continue-on-failure'):
return True
return default or self.in_teardown
@property
def allow_loop_control(self):
for _, result, _ in reversed(self.steps):
if result.type == 'ITERATION':
return True
if result.type == 'KEYWORD' and result.owner != 'BuiltIn':
return False
return False
def end_suite(self, data, result):
for name in ['${PREV_TEST_NAME}',
'${PREV_TEST_STATUS}',
'${PREV_TEST_MESSAGE}']:
self.variables.set_global(name, self.variables[name])
self.output.end_suite(data, result)
self.namespace.end_suite(data)
EXECUTION_CONTEXTS.end_suite()
def set_suite_variables(self, suite):
self.variables['${SUITE_NAME}'] = suite.full_name
self.variables['${SUITE_SOURCE}'] = str(suite.source or '')
self.variables['${SUITE_DOCUMENTATION}'] = suite.doc
self.variables['${SUITE_METADATA}'] = suite.metadata.copy()
def report_suite_status(self, status, message):
self.variables['${SUITE_STATUS}'] = status
self.variables['${SUITE_MESSAGE}'] = message
def start_test(self, data, result):
self.test = result
self._add_timeout(result.timeout)
self.namespace.start_test()
self.variables.set_test('${TEST_NAME}', result.name)
self.variables.set_test('${TEST_DOCUMENTATION}', result.doc)
self.variables.set_test('@{TEST_TAGS}', list(result.tags))
self.output.start_test(data, result)
def _add_timeout(self, timeout):
if timeout:
timeout.start()
self.timeouts.add(timeout)
def _remove_timeout(self, timeout):
if timeout in self.timeouts:
self.timeouts.remove(timeout)
def end_test(self, test):
self.test = None
self._remove_timeout(test.timeout)
self.namespace.end_test()
self.variables.set_suite('${PREV_TEST_NAME}', test.name)
self.variables.set_suite('${PREV_TEST_STATUS}', test.status)
self.variables.set_suite('${PREV_TEST_MESSAGE}', test.message)
self.timeout_occurred = False
def start_body_item(self, data, result, implementation=None):
self._prevent_execution_close_to_recursion_limit()
self.steps.append((data, result, implementation))
output = self.output
args = (data, result)
if implementation:
if implementation.error:
method = output.start_invalid_keyword
elif implementation.type == implementation.LIBRARY_KEYWORD:
method = output.start_library_keyword
else:
method = output.start_user_keyword
args = (data, implementation, result)
elif result.type in (result.ELSE, result.ITERATION):
method = {
result.IF_ELSE_ROOT: output.start_if_branch,
result.TRY_EXCEPT_ROOT: output.start_try_branch,
result.FOR: output.start_for_iteration,
result.WHILE: output.start_while_iteration,
}[result.parent.type]
else:
method = {
result.FOR: output.start_for,
result.WHILE: output.start_while,
result.GROUP: output.start_group,
result.IF_ELSE_ROOT: output.start_if,
result.IF: output.start_if_branch,
result.ELSE: output.start_if_branch,
result.ELSE_IF: output.start_if_branch,
result.TRY_EXCEPT_ROOT: output.start_try,
result.TRY: output.start_try_branch,
result.EXCEPT: output.start_try_branch,
result.FINALLY: output.start_try_branch,
result.VAR: output.start_var,
result.BREAK: output.start_break,
result.CONTINUE: output.start_continue,
result.RETURN: output.start_return,
result.ERROR: output.start_error,
}[result.type]
method(*args)
def _prevent_execution_close_to_recursion_limit(self):
try:
sys._getframe(sys.getrecursionlimit() - 100)
except (ValueError, AttributeError):
pass
else:
raise DataError('Recursive execution stopped.')
def end_body_item(self, data, result, implementation=None):
output = self.output
args = (data, result)
if implementation:
if implementation.error:
method = output.end_invalid_keyword
elif implementation.type == implementation.LIBRARY_KEYWORD:
method = output.end_library_keyword
else:
method = output.end_user_keyword
args = (data, implementation, result)
elif result.type in (result.ELSE, result.ITERATION):
method = {
result.IF_ELSE_ROOT: output.end_if_branch,
result.TRY_EXCEPT_ROOT: output.end_try_branch,
result.FOR: output.end_for_iteration,
result.WHILE: output.end_while_iteration,
}[result.parent.type]
else:
method = {
result.FOR: output.end_for,
result.WHILE: output.end_while,
result.GROUP: output.end_group,
result.IF_ELSE_ROOT: output.end_if,
result.IF: output.end_if_branch,
result.ELSE: output.end_if_branch,
result.ELSE_IF: output.end_if_branch,
result.TRY_EXCEPT_ROOT: output.end_try,
result.TRY: output.end_try_branch,
result.EXCEPT: output.end_try_branch,
result.FINALLY: output.end_try_branch,
result.VAR: output.end_var,
result.BREAK: output.end_break,
result.CONTINUE: output.end_continue,
result.RETURN: output.end_return,
result.ERROR: output.end_error,
}[result.type]
method(*args)
self.steps.pop()
def get_runner(self, name, recommend_on_failure=True):
return self.namespace.get_runner(name, recommend_on_failure)
def trace(self, message):
self.output.trace(message)
def debug(self, message):
self.output.debug(message)
def info(self, message):
self.output.info(message)
def warn(self, message):
self.output.warn(message)
def fail(self, message):
self.output.fail(message)
def skip(self, message):
self.output.skip(message)