Source code for robot.running.bodyrunner

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import re
import time
from collections import OrderedDict
from contextlib import contextmanager
from datetime import datetime
from itertools import zip_longest

from robot.errors import (
    BreakLoop, ContinueLoop, DataError, ExecutionFailed, ExecutionFailures,
    ExecutionPassed, ExecutionStatus
)
from robot.output import librarylogger as logger
from robot.utils import (
    cut_assign_value, frange, get_error_message, is_list_like, Matcher, normalize,
    plural_or_not as s, secs_to_timestr, seq2str, split_from_equals, timestr_to_secs,
    type_name
)
from robot.variables import evaluate_expression, is_dict_variable, search_variable

from .statusreporter import StatusReporter

DEFAULT_WHILE_LIMIT = 10_000


[docs] class BodyRunner: def __init__(self, context, run=True, templated=False): self._context = context self._run = run self._templated = templated
[docs] def run(self, data, result): errors = [] passed = None for item in data.body: try: item.run(result, self._context, self._run, self._templated) except ExecutionPassed as exception: exception.set_earlier_failures(errors) passed = exception self._run = False except ExecutionFailed as exception: errors.extend(exception.get_errors()) self._run = exception.can_continue(self._context, self._templated) if passed: raise passed if errors and self._templated: errors = self._handle_skip_with_templates(errors, result) if errors: raise ExecutionFailures(errors)
def _handle_skip_with_templates(self, errors, result): iterations = result.body.filter(messages=False) if len(iterations) < 2 or not any(e.skip for e in errors): return errors if all(i.skipped for i in iterations): raise ExecutionFailed("All iterations skipped.", skip=True) return [e for e in errors if not e.skip]
[docs] class KeywordRunner: def __init__(self, context, run=True): self._context = context self._run = run
[docs] def run(self, data, result, setup_or_teardown=False): context = self._context if setup_or_teardown: runner = self._get_setup_teardown_runner(data, context) else: runner = context.get_runner(data.name, recommend_on_failure=self._run) if not runner: return None if context.dry_run: return runner.dry_run(data, result, context) return runner.run(data, result, context, self._run)
def _get_setup_teardown_runner(self, data, context): try: name = context.variables.replace_string(data.name) except DataError as err: if context.dry_run: return None raise ExecutionFailed(err.message) if name.upper() in ("NONE", ""): return None # If the matched runner accepts embedded arguments, use the original name # instead of the one where variables are already replaced and converted to # strings. This allows using non-string values as embedded arguments also # in this context. An exact match after variables have been replaced has # a precedence over a possible embedded match with the original name, though. # BuiltIn.run_keyword has the same logic. runner = context.get_runner(name, recommend_on_failure=self._run) if hasattr(runner, "embedded_args") and name != data.name: runner = context.get_runner(data.name) return runner
[docs] def ForRunner(context, flavor="IN", run=True, templated=False): runners = { "IN": ForInRunner, "IN RANGE": ForInRangeRunner, "IN ZIP": ForInZipRunner, "IN ENUMERATE": ForInEnumerateRunner, } runner = runners[flavor or "IN"] return runner(context, run, templated)
[docs] class ForInRunner: flavor = "IN" def __init__(self, context, run=True, templated=False): self._context = context self._run = run self._templated = templated
[docs] def run(self, data, result): error = None run = False if self._run: if data.error: error = DataError(data.error, syntax=True) else: run = True with StatusReporter(data, result, self._context, run) as status: if run: try: assign, types = self._split_types(data) values_for_rounds = self._get_values_for_rounds(data) except DataError as err: error = err else: if self._run_loop(data, result, assign, types, values_for_rounds): return status.pass_status = result.NOT_RUN self._no_run_one_round(data, result) if error: raise error
def _run_loop(self, data, result, assign, types, values_for_rounds): errors = [] executed = False for values in values_for_rounds: executed = True try: self._run_one_round(data, result, assign, types, values) except (BreakLoop, ContinueLoop) as ctrl: if ctrl.earlier_failures: errors.extend(ctrl.earlier_failures.get_errors()) if isinstance(ctrl, BreakLoop): break except ExecutionPassed as passed: passed.set_earlier_failures(errors) raise passed except ExecutionFailed as failed: errors.extend(failed.get_errors()) if not failed.can_continue(self._context, self._templated): break if errors: if self._templated and len(errors) > 1 and all(e.skip for e in errors): raise ExecutionFailed("All iterations skipped.", skip=True) raise ExecutionFailures(errors) return executed def _split_types(self, data): from .arguments import TypeInfo assign = [] types = [] for variable in data.assign: match = search_variable(variable, parse_type=True) assign.append(match.name) try: types.append(TypeInfo.from_variable(match) if match.type else None) except DataError as err: raise DataError(f"Invalid FOR loop variable '{variable}': {err}") return assign, types def _get_values_for_rounds(self, data): if self._context.dry_run: return [[""] * len(data.assign)] values_per_round = len(data.assign) if self._is_dict_iteration(data.values): values = self._resolve_dict_values(data.values) values = self._map_dict_values_to_rounds(values, values_per_round) else: values = self._resolve_values(data.values) values = self._map_values_to_rounds(values, values_per_round) return values def _is_dict_iteration(self, values): all_name_value = True for item in values: if is_dict_variable(item): return True if split_from_equals(item)[1] is None: all_name_value = False if all_name_value and values: name, value = split_from_equals(values[0]) logger.warn( f"FOR loop iteration over values that are all in 'name=value' format " f"like '{values[0]}' is deprecated. In the future this syntax will " f"mean iterating over names and values separately like when iterating " f"over '&{{dict}} variables. Escape at least one of the values like " f"'{name}\\={value}' to use normal FOR loop iteration and to disable " f"this warning." ) return False def _resolve_dict_values(self, values): result = OrderedDict() replace_scalar = self._context.variables.replace_scalar for item in values: if is_dict_variable(item): result.update(replace_scalar(item)) else: key, value = split_from_equals(item) if value is None: raise DataError( f"Invalid FOR loop value '{item}'. When iterating " f"over dictionaries, values must be '&{{dict}}' " f"variables or use 'key=value' syntax.", syntax=True, ) try: result[replace_scalar(key)] = replace_scalar(value) except TypeError: err = get_error_message() raise DataError(f"Invalid dictionary item '{item}': {err}") return result.items() def _map_dict_values_to_rounds(self, values, per_round): if per_round > 2: raise DataError( f"Number of FOR loop variables must be 1 or 2 when iterating " f"over dictionaries, got {per_round}.", syntax=True, ) return values def _resolve_values(self, values): return self._context.variables.replace_list(values) def _map_values_to_rounds(self, values, per_round): count = len(values) if count % per_round != 0: self._raise_wrong_variable_count(per_round, count) # Map list of values to list of lists containing values per round. return (values[i : i + per_round] for i in range(0, count, per_round)) def _raise_wrong_variable_count(self, variables, values): raise DataError( f"Number of FOR loop values should be multiple of its variables. " f"Got {variables} variables but {values} value{s(values)}." ) def _run_one_round(self, data, result, assign, types, values, run=True): ctx = self._context iter_data = data.get_iteration() iter_result = result.body.create_iteration() variables = ctx.variables if run and not ctx.dry_run else {} if len(assign) == 1 and len(values) != 1: values = [tuple(values)] for orig, name, type_info, value in zip(data.assign, assign, types, values): if type_info and not ctx.dry_run: value = type_info.convert(value, orig, kind="FOR loop variable") variables[name] = value iter_data.assign[orig] = value iter_result.assign[orig] = cut_assign_value(value) runner = BodyRunner(self._context, run, self._templated) with StatusReporter(iter_data, iter_result, self._context, run): runner.run(iter_data, iter_result) def _no_run_one_round(self, data, result): self._run_one_round( data, result, assign=data.assign, types=[None] * len(data.assign), values=[""] * len(data.assign), run=False, )
[docs] class ForInRangeRunner(ForInRunner): flavor = "IN RANGE" def _resolve_dict_values(self, values): raise DataError( "FOR IN RANGE loops do not support iterating over dictionaries.", syntax=True, ) def _map_values_to_rounds(self, values, per_round): if not 1 <= len(values) <= 3: raise DataError( f"FOR IN RANGE expected 1-3 values, got {len(values)}.", syntax=True, ) try: values = [self._to_number_with_arithmetic(v) for v in values] except Exception: msg = get_error_message() raise DataError(f"Converting FOR IN RANGE values failed: {msg}.") values = frange(*values) return super()._map_values_to_rounds(values, per_round) def _to_number_with_arithmetic(self, item): if isinstance(item, (int, float)): return item number = eval(str(item), {}) if not isinstance(number, (int, float)): raise TypeError(f"Expected number, got {type_name(item)}.") return number
[docs] class ForInZipRunner(ForInRunner): flavor = "IN ZIP" _mode = None _fill = None def _get_values_for_rounds(self, data): self._mode = self._resolve_mode(data.mode) self._fill = self._resolve_fill(data.fill) return super()._get_values_for_rounds(data) def _resolve_mode(self, mode): if not mode or self._context.dry_run: return None try: mode = self._context.variables.replace_string(mode) valid = ("STRICT", "SHORTEST", "LONGEST") if mode.upper() in valid: return mode.upper() raise DataError( f"Value '{mode}' is not accepted. Valid values are {seq2str(valid)}." ) except DataError as err: raise DataError(f"Invalid FOR IN ZIP mode: {err}") def _resolve_fill(self, fill): if not fill or self._context.dry_run: return None try: return self._context.variables.replace_scalar(fill) except DataError as err: raise DataError(f"Invalid FOR IN ZIP fill value: {err}") def _resolve_dict_values(self, values): raise DataError( "FOR IN ZIP loops do not support iterating over dictionaries.", syntax=True, ) def _map_values_to_rounds(self, values, per_round): self._validate_types(values) if len(values) % per_round != 0: self._raise_wrong_variable_count(per_round, len(values)) if self._mode == "LONGEST": return zip_longest(*values, fillvalue=self._fill) if self._mode == "STRICT": self._validate_strict_lengths(values) if self._mode is None: self._deprecate_different_lengths(values) return zip(*values) def _validate_types(self, values): for index, item in enumerate(values, start=1): if not is_list_like(item): raise DataError( f"FOR IN ZIP items must be list-like, " f"but item {index} is {type_name(item)}." ) def _validate_strict_lengths(self, values): lengths = [] for index, item in enumerate(values, start=1): try: lengths.append(len(item)) except TypeError: raise DataError( f"FOR IN ZIP items must have length in the STRICT mode, " f"but item {index} does not." ) if len(set(lengths)) > 1: raise DataError( f"FOR IN ZIP items must have equal lengths in the STRICT mode, " f"but lengths are {seq2str(lengths, quote='')}." ) def _deprecate_different_lengths(self, values): try: self._validate_strict_lengths(values) except DataError as err: logger.warn( f"FOR IN ZIP default mode will be changed from SHORTEST to STRICT in " f"Robot Framework 8.0. Use 'mode=SHORTEST' to keep using the SHORTEST " f"mode. If the mode is not changed, execution will fail like this in " f"the future: {err}" )
[docs] class ForInEnumerateRunner(ForInRunner): flavor = "IN ENUMERATE" _start = 0 def _get_values_for_rounds(self, data): self._start = self._resolve_start(data.start) return super()._get_values_for_rounds(data) def _resolve_start(self, start): if not start or self._context.dry_run: return 0 try: start = self._context.variables.replace_string(start) try: return int(start) except ValueError: raise DataError(f"Value must be an integer, got '{start}'.") except DataError as err: raise DataError(f"Invalid FOR IN ENUMERATE start value: {err}") def _map_dict_values_to_rounds(self, values, per_round): if per_round > 3: raise DataError( f"Number of FOR IN ENUMERATE loop variables must be 1-3 " f"when iterating over dictionaries, got {per_round}.", syntax=True, ) if per_round == 2: return ((i, v) for i, v in enumerate(values, start=self._start)) return ((i, *v) for i, v in enumerate(values, start=self._start)) def _map_values_to_rounds(self, values, per_round): values = super()._map_values_to_rounds(values, max(per_round - 1, 1)) return ((i, *v) for i, v in enumerate(values, start=self._start)) def _raise_wrong_variable_count(self, variables, values): raise DataError( f"Number of FOR IN ENUMERATE loop values should be multiple of its " f"variables (excluding the index). Got {variables} variables but " f"{values} value{s(values)}." )
[docs] class WhileRunner: def __init__(self, context, run=True, templated=False): self._context = context self._run = run self._templated = templated
[docs] def run(self, data, result): ctx = self._context error = None run = False result.start_time = datetime.now() iter_result = result.body.create_iteration(start_time=datetime.now()) if self._run: if data.error: error = DataError(data.error, syntax=True) elif not ctx.dry_run: try: run = self._should_run(data.condition, ctx.variables) except DataError as err: error = err with StatusReporter(data, result, self._context, run): iter_data = data.get_iteration() if run: try: limit = WhileLimit.create(data, ctx.variables) except DataError as err: error = err run = False if ctx.dry_run or not run: self._run_iteration(iter_data, iter_result, run) if error: raise error return errors = [] while True: try: with limit: self._run_iteration(iter_data, iter_result) except (BreakLoop, ContinueLoop) as ctrl: if ctrl.earlier_failures: errors.extend(ctrl.earlier_failures.get_errors()) if isinstance(ctrl, BreakLoop): break except ExecutionPassed as passed: passed.set_earlier_failures(errors) raise passed except LimitExceeded as exceeded: if exceeded.on_limit_pass: self._context.info(exceeded.message) else: errors.append(exceeded) break except ExecutionFailed as failed: errors.extend(failed.get_errors()) if not failed.can_continue(ctx, self._templated): break iter_result = result.body.create_iteration(start_time=datetime.now()) iter_data = data.get_iteration() if not self._should_run(data.condition, ctx.variables): break if errors: raise ExecutionFailures(errors)
def _run_iteration(self, data, result, run=True): runner = BodyRunner(self._context, run, self._templated) with StatusReporter(data, result, self._context, run): runner.run(data, result) def _should_run(self, condition, variables): if not condition: return True try: return evaluate_expression( condition, variables.current, resolve_variables=True, ) except Exception: msg = get_error_message() raise DataError(f"Invalid WHILE loop condition: {msg}")
[docs] class GroupRunner: def __init__(self, context, run=True, templated=False): self._context = context self._run = run self._templated = templated
[docs] def run(self, data, result): if self._run: error = self._initialize(data, result) run = error is None else: error = None run = False with StatusReporter(data, result, self._context, run=run): runner = BodyRunner(self._context, run, self._templated) runner.run(data, result) if error: raise error
def _initialize(self, data, result): if data.error: return DataError(data.error, syntax=True) try: result.name = self._context.variables.replace_string(result.name) except DataError as err: return err return None
[docs] class IfRunner: _dry_run_stack = [] def __init__(self, context, run=True, templated=False): self._context = context self._run = run self._templated = templated
[docs] def run(self, data, result): with self._dry_run_recursion_detection(data) as recursive_dry_run: error = None with StatusReporter(data, result, self._context, self._run): for branch in data.body: try: if self._run_if_branch( branch, result, recursive_dry_run, data.error, ): self._run = False except ExecutionStatus as err: error = err self._run = False if error: raise error
@contextmanager def _dry_run_recursion_detection(self, data): if not self._context.dry_run: yield False else: data = data.to_dict() recursive = data in self._dry_run_stack self._dry_run_stack.append(data) try: yield recursive finally: self._dry_run_stack.pop() def _run_if_branch(self, data, result, recursive_dry_run=False, syntax_error=None): context = self._context result = result.body.create_branch( data.type, data.condition, start_time=datetime.now(), ) error = None if syntax_error: run_branch = False error = DataError(syntax_error, syntax=True) else: try: run_branch = self._should_run_branch(data, context, recursive_dry_run) except DataError as err: error = err run_branch = False with StatusReporter(data, result, context, run_branch): runner = BodyRunner(context, run_branch, self._templated) if not recursive_dry_run: runner.run(data, result) if error and self._run: raise error return run_branch def _should_run_branch(self, data, context, recursive_dry_run=False): if context.dry_run: return not recursive_dry_run if not self._run: return False if data.condition is None: return True try: return evaluate_expression( data.condition, context.variables.current, resolve_variables=True, ) except Exception: msg = get_error_message() raise DataError(f"Invalid {data.type} condition: {msg}")
[docs] class TryRunner: def __init__(self, context, run=True, templated=False): self._context = context self._run = run self._templated = templated
[docs] def run(self, data, result): run = self._run with StatusReporter(data, result, self._context, run): if data.error: self._run_invalid(data, result) return error = self._run_try(data, result, run) run_excepts_or_else = self._should_run_excepts_or_else(error, run) if error: error = self._run_excepts(data, result, error, run=run_excepts_or_else) self._run_else(data, result, run=False) else: self._run_excepts(data, result, error, run=False) error = self._run_else(data, result, run=run_excepts_or_else) error = self._run_finally(data, result, run) or error if error: raise error
def _run_invalid(self, data, result): error_reported = False for branch in data.body: branch_result = result.body.create_branch( branch.type, branch.patterns, branch.pattern_type, branch.assign, ) with StatusReporter( branch, branch_result, self._context, run=False, suppress=True, ): runner = BodyRunner(self._context, run=False, templated=self._templated) runner.run(branch, branch_result) if not error_reported: error_reported = True raise DataError(data.error, syntax=True) raise ExecutionFailed(data.error, syntax=True) def _run_try(self, data, result, run): result = result.body.create_branch(data.TRY) return self._run_branch(data.try_branch, result, run) def _should_run_excepts_or_else(self, error, run): if not run: return False if not error: return True return not (error.skip or error.syntax or isinstance(error, ExecutionPassed)) def _run_branch(self, data, result, run=True, error=None): try: with StatusReporter(data, result, self._context, run): if error: raise error runner = BodyRunner(self._context, run, self._templated) runner.run(data, result) except ExecutionStatus as err: return err else: return None def _run_excepts(self, data, result, error, run): for branch in data.except_branches: try: run_branch = run and self._should_run_except(branch, error) except DataError as err: run_branch = True pattern_error = err else: pattern_error = None branch_result = result.body.create_branch( branch.type, branch.patterns, branch.pattern_type, branch.assign, ) if run_branch: if branch.assign: self._context.variables[branch.assign] = str(error) error = self._run_branch(branch, branch_result, error=pattern_error) run = False else: self._run_branch(branch, branch_result, run=False) return error def _should_run_except(self, branch, error): if not branch.patterns: return True matchers = { "GLOB": lambda m, p: Matcher(p, spaceless=False, caseless=False).match(m), "REGEXP": lambda m, p: re.fullmatch(p, m) is not None, "START": lambda m, p: m.startswith(p), "LITERAL": lambda m, p: m == p, } if branch.pattern_type: pattern_type = self._context.variables.replace_string(branch.pattern_type) else: pattern_type = "LITERAL" matcher = matchers.get(pattern_type.upper()) if not matcher: raise DataError( f"Invalid EXCEPT pattern type '{pattern_type}'. " f"Valid values are {seq2str(matchers)}." ) for pattern in branch.patterns: if matcher(error.message, self._context.variables.replace_string(pattern)): return True return False def _run_else(self, data, result, run): if data.else_branch: result = result.body.create_branch(data.ELSE) return self._run_branch(data.else_branch, result, run) def _run_finally(self, data, result, run): if data.finally_branch: result = result.body.create_branch(data.FINALLY) try: with StatusReporter(data.finally_branch, result, self._context, run): runner = BodyRunner(self._context, run, self._templated) runner.run(data.finally_branch, result) except ExecutionStatus as err: return err else: return None
[docs] class WhileLimit: def __init__(self, on_limit=None, on_limit_message=None): self.on_limit = on_limit self.on_limit_message = on_limit_message
[docs] @classmethod def create(cls, data, variables): limit = cls._parse_limit(data.limit, variables) on_limit = cls._parse_on_limit(data.on_limit, variables) on_limit_msg = cls._parse_on_limit_message(data.on_limit_message, variables) if not limit: return IterationCountLimit(DEFAULT_WHILE_LIMIT, on_limit, on_limit_msg) if limit.upper() == "NONE": return NoLimit() try: count = cls._parse_limit_as_count(limit) except ValueError: seconds = cls._parse_limit_as_timestr(limit) return DurationLimit(seconds, on_limit, on_limit_msg) else: return IterationCountLimit(count, on_limit, on_limit_msg)
@classmethod def _parse_limit(cls, limit, variables): if not limit: return None try: return variables.replace_string(limit) except DataError as err: raise DataError(f"Invalid WHILE loop limit: {err}") @classmethod def _parse_on_limit(cls, on_limit, variables): if not on_limit: return None try: on_limit = variables.replace_string(on_limit) if on_limit.upper() in ("PASS", "FAIL"): return on_limit.upper() raise DataError( f"Value '{on_limit}' is not accepted. Valid values are " f"'PASS' and 'FAIL'." ) except DataError as err: raise DataError(f"Invalid WHILE loop 'on_limit': {err}") @classmethod def _parse_on_limit_message(cls, on_limit_message, variables): if not on_limit_message: return None try: return variables.replace_string(on_limit_message) except DataError as err: raise DataError(f"Invalid WHILE loop 'on_limit_message': '{err}") @classmethod def _parse_limit_as_count(cls, limit): limit = normalize(limit) if limit.endswith("times"): limit = limit[:-5] elif limit.endswith("x"): limit = limit[:-1] count = int(limit) if count <= 0: raise DataError( f"Invalid WHILE loop limit: Iteration count must be a positive " f"integer, got '{count}'." ) return count @classmethod def _parse_limit_as_timestr(cls, limit): try: return timestr_to_secs(limit) except ValueError as err: raise DataError(f"Invalid WHILE loop limit: {err.args[0]}")
[docs] def limit_exceeded(self): if self.on_limit_message: message = self.on_limit_message else: message = ( f"WHILE loop was aborted because it did not finish within the limit " f"of {self}. Use the 'limit' argument to increase or remove the limit " f"if needed." ) raise LimitExceeded(self.on_limit == "PASS", message)
def __enter__(self): raise NotImplementedError def __exit__(self, exc_type, exc_val, exc_tb): return None
[docs] class DurationLimit(WhileLimit): def __init__(self, max_time, on_limit, on_limit_message): super().__init__(on_limit, on_limit_message) self.max_time = max_time self.start_time = None def __enter__(self): if not self.start_time: self.start_time = time.time() if time.time() - self.start_time > self.max_time: self.limit_exceeded() def __str__(self): return secs_to_timestr(self.max_time)
[docs] class IterationCountLimit(WhileLimit): def __init__(self, max_iterations, on_limit, on_limit_message): super().__init__(on_limit, on_limit_message) self.max_iterations = max_iterations self.current_iterations = 0 def __enter__(self): if self.current_iterations >= self.max_iterations: self.limit_exceeded() self.current_iterations += 1 def __str__(self): return f"{self.max_iterations} iterations"
[docs] class NoLimit(WhileLimit): def __enter__(self): pass
[docs] class LimitExceeded(ExecutionFailed): def __init__(self, on_limit_pass, message): super().__init__(message) self.on_limit_pass = on_limit_pass