# Copyright 2008-2015 Nokia Networks
# Copyright 2016- Robot Framework Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import time
from collections import OrderedDict
from contextlib import contextmanager
from datetime import datetime
from itertools import zip_longest
from robot.errors import (
BreakLoop, ContinueLoop, DataError, ExecutionFailed, ExecutionFailures,
ExecutionPassed, ExecutionStatus
)
from robot.output import librarylogger as logger
from robot.utils import (
cut_assign_value, frange, get_error_message, is_list_like, Matcher, normalize,
plural_or_not as s, secs_to_timestr, seq2str, split_from_equals, timestr_to_secs,
type_name
)
from robot.variables import evaluate_expression, is_dict_variable, search_variable
from .statusreporter import StatusReporter
DEFAULT_WHILE_LIMIT = 10_000
[docs]
class BodyRunner:
def __init__(self, context, run=True, templated=False):
self._context = context
self._run = run
self._templated = templated
[docs]
def run(self, data, result):
errors = []
passed = None
for item in data.body:
try:
item.run(result, self._context, self._run, self._templated)
except ExecutionPassed as exception:
exception.set_earlier_failures(errors)
passed = exception
self._run = False
except ExecutionFailed as exception:
errors.extend(exception.get_errors())
self._run = exception.can_continue(self._context, self._templated)
if passed:
raise passed
if errors and self._templated:
errors = self._handle_skip_with_templates(errors, result)
if errors:
raise ExecutionFailures(errors)
def _handle_skip_with_templates(self, errors, result):
iterations = result.body.filter(messages=False)
if len(iterations) < 2 or not any(e.skip for e in errors):
return errors
if all(i.skipped for i in iterations):
raise ExecutionFailed("All iterations skipped.", skip=True)
return [e for e in errors if not e.skip]
[docs]
class KeywordRunner:
def __init__(self, context, run=True):
self._context = context
self._run = run
[docs]
def run(self, data, result, setup_or_teardown=False):
context = self._context
if setup_or_teardown:
runner = self._get_setup_teardown_runner(data, context)
else:
runner = context.get_runner(data.name, recommend_on_failure=self._run)
if not runner:
return None
if context.dry_run:
return runner.dry_run(data, result, context)
return runner.run(data, result, context, self._run)
def _get_setup_teardown_runner(self, data, context):
try:
name = context.variables.replace_string(data.name)
except DataError as err:
if context.dry_run:
return None
raise ExecutionFailed(err.message)
if name.upper() in ("NONE", ""):
return None
# If the matched runner accepts embedded arguments, use the original name
# instead of the one where variables are already replaced and converted to
# strings. This allows using non-string values as embedded arguments also
# in this context. An exact match after variables have been replaced has
# a precedence over a possible embedded match with the original name, though.
# BuiltIn.run_keyword has the same logic.
runner = context.get_runner(name, recommend_on_failure=self._run)
if hasattr(runner, "embedded_args") and name != data.name:
runner = context.get_runner(data.name)
return runner
[docs]
def ForRunner(context, flavor="IN", run=True, templated=False):
runners = {
"IN": ForInRunner,
"IN RANGE": ForInRangeRunner,
"IN ZIP": ForInZipRunner,
"IN ENUMERATE": ForInEnumerateRunner,
}
runner = runners[flavor or "IN"]
return runner(context, run, templated)
[docs]
class ForInRunner:
flavor = "IN"
def __init__(self, context, run=True, templated=False):
self._context = context
self._run = run
self._templated = templated
[docs]
def run(self, data, result):
error = None
run = False
if self._run:
if data.error:
error = DataError(data.error, syntax=True)
else:
run = True
with StatusReporter(data, result, self._context, run) as status:
if run:
try:
assign, types = self._split_types(data)
values_for_rounds = self._get_values_for_rounds(data)
except DataError as err:
error = err
else:
if self._run_loop(data, result, assign, types, values_for_rounds):
return
status.pass_status = result.NOT_RUN
self._no_run_one_round(data, result)
if error:
raise error
def _run_loop(self, data, result, assign, types, values_for_rounds):
errors = []
executed = False
for values in values_for_rounds:
executed = True
try:
self._run_one_round(data, result, assign, types, values)
except (BreakLoop, ContinueLoop) as ctrl:
if ctrl.earlier_failures:
errors.extend(ctrl.earlier_failures.get_errors())
if isinstance(ctrl, BreakLoop):
break
except ExecutionPassed as passed:
passed.set_earlier_failures(errors)
raise passed
except ExecutionFailed as failed:
errors.extend(failed.get_errors())
if not failed.can_continue(self._context, self._templated):
break
if errors:
if self._templated and len(errors) > 1 and all(e.skip for e in errors):
raise ExecutionFailed("All iterations skipped.", skip=True)
raise ExecutionFailures(errors)
return executed
def _split_types(self, data):
from .arguments import TypeInfo
assign = []
types = []
for variable in data.assign:
match = search_variable(variable, parse_type=True)
assign.append(match.name)
try:
types.append(TypeInfo.from_variable(match) if match.type else None)
except DataError as err:
raise DataError(f"Invalid FOR loop variable '{variable}': {err}")
return assign, types
def _get_values_for_rounds(self, data):
if self._context.dry_run:
return [[""] * len(data.assign)]
values_per_round = len(data.assign)
if self._is_dict_iteration(data.values):
values = self._resolve_dict_values(data.values)
values = self._map_dict_values_to_rounds(values, values_per_round)
else:
values = self._resolve_values(data.values)
values = self._map_values_to_rounds(values, values_per_round)
return values
def _is_dict_iteration(self, values):
all_name_value = True
for item in values:
if is_dict_variable(item):
return True
if split_from_equals(item)[1] is None:
all_name_value = False
if all_name_value and values:
name, value = split_from_equals(values[0])
logger.warn(
f"FOR loop iteration over values that are all in 'name=value' format "
f"like '{values[0]}' is deprecated. In the future this syntax will "
f"mean iterating over names and values separately like when iterating "
f"over '&{{dict}} variables. Escape at least one of the values like "
f"'{name}\\={value}' to use normal FOR loop iteration and to disable "
f"this warning."
)
return False
def _resolve_dict_values(self, values):
result = OrderedDict()
replace_scalar = self._context.variables.replace_scalar
for item in values:
if is_dict_variable(item):
result.update(replace_scalar(item))
else:
key, value = split_from_equals(item)
if value is None:
raise DataError(
f"Invalid FOR loop value '{item}'. When iterating "
f"over dictionaries, values must be '&{{dict}}' "
f"variables or use 'key=value' syntax.",
syntax=True,
)
try:
result[replace_scalar(key)] = replace_scalar(value)
except TypeError:
err = get_error_message()
raise DataError(f"Invalid dictionary item '{item}': {err}")
return result.items()
def _map_dict_values_to_rounds(self, values, per_round):
if per_round > 2:
raise DataError(
f"Number of FOR loop variables must be 1 or 2 when iterating "
f"over dictionaries, got {per_round}.",
syntax=True,
)
return values
def _resolve_values(self, values):
return self._context.variables.replace_list(values)
def _map_values_to_rounds(self, values, per_round):
count = len(values)
if count % per_round != 0:
self._raise_wrong_variable_count(per_round, count)
# Map list of values to list of lists containing values per round.
return (values[i : i + per_round] for i in range(0, count, per_round))
def _raise_wrong_variable_count(self, variables, values):
raise DataError(
f"Number of FOR loop values should be multiple of its variables. "
f"Got {variables} variables but {values} value{s(values)}."
)
def _run_one_round(self, data, result, assign, types, values, run=True):
ctx = self._context
iter_data = data.get_iteration()
iter_result = result.body.create_iteration()
variables = ctx.variables if run and not ctx.dry_run else {}
if len(assign) == 1 and len(values) != 1:
values = [tuple(values)]
for orig, name, type_info, value in zip(data.assign, assign, types, values):
if type_info and not ctx.dry_run:
value = type_info.convert(value, orig, kind="FOR loop variable")
variables[name] = value
iter_data.assign[orig] = value
iter_result.assign[orig] = cut_assign_value(value)
runner = BodyRunner(self._context, run, self._templated)
with StatusReporter(iter_data, iter_result, self._context, run):
runner.run(iter_data, iter_result)
def _no_run_one_round(self, data, result):
self._run_one_round(
data,
result,
assign=data.assign,
types=[None] * len(data.assign),
values=[""] * len(data.assign),
run=False,
)
[docs]
class ForInRangeRunner(ForInRunner):
flavor = "IN RANGE"
def _resolve_dict_values(self, values):
raise DataError(
"FOR IN RANGE loops do not support iterating over dictionaries.",
syntax=True,
)
def _map_values_to_rounds(self, values, per_round):
if not 1 <= len(values) <= 3:
raise DataError(
f"FOR IN RANGE expected 1-3 values, got {len(values)}.",
syntax=True,
)
try:
values = [self._to_number_with_arithmetic(v) for v in values]
except Exception:
msg = get_error_message()
raise DataError(f"Converting FOR IN RANGE values failed: {msg}.")
values = frange(*values)
return super()._map_values_to_rounds(values, per_round)
def _to_number_with_arithmetic(self, item):
if isinstance(item, (int, float)):
return item
number = eval(str(item), {})
if not isinstance(number, (int, float)):
raise TypeError(f"Expected number, got {type_name(item)}.")
return number
[docs]
class ForInZipRunner(ForInRunner):
flavor = "IN ZIP"
_mode = None
_fill = None
def _get_values_for_rounds(self, data):
self._mode = self._resolve_mode(data.mode)
self._fill = self._resolve_fill(data.fill)
return super()._get_values_for_rounds(data)
def _resolve_mode(self, mode):
if not mode or self._context.dry_run:
return None
try:
mode = self._context.variables.replace_string(mode)
valid = ("STRICT", "SHORTEST", "LONGEST")
if mode.upper() in valid:
return mode.upper()
raise DataError(
f"Value '{mode}' is not accepted. Valid values are {seq2str(valid)}."
)
except DataError as err:
raise DataError(f"Invalid FOR IN ZIP mode: {err}")
def _resolve_fill(self, fill):
if not fill or self._context.dry_run:
return None
try:
return self._context.variables.replace_scalar(fill)
except DataError as err:
raise DataError(f"Invalid FOR IN ZIP fill value: {err}")
def _resolve_dict_values(self, values):
raise DataError(
"FOR IN ZIP loops do not support iterating over dictionaries.",
syntax=True,
)
def _map_values_to_rounds(self, values, per_round):
self._validate_types(values)
if len(values) % per_round != 0:
self._raise_wrong_variable_count(per_round, len(values))
if self._mode == "LONGEST":
return zip_longest(*values, fillvalue=self._fill)
if self._mode == "STRICT":
self._validate_strict_lengths(values)
if self._mode is None:
self._deprecate_different_lengths(values)
return zip(*values)
def _validate_types(self, values):
for index, item in enumerate(values, start=1):
if not is_list_like(item):
raise DataError(
f"FOR IN ZIP items must be list-like, "
f"but item {index} is {type_name(item)}."
)
def _validate_strict_lengths(self, values):
lengths = []
for index, item in enumerate(values, start=1):
try:
lengths.append(len(item))
except TypeError:
raise DataError(
f"FOR IN ZIP items must have length in the STRICT mode, "
f"but item {index} does not."
)
if len(set(lengths)) > 1:
raise DataError(
f"FOR IN ZIP items must have equal lengths in the STRICT mode, "
f"but lengths are {seq2str(lengths, quote='')}."
)
def _deprecate_different_lengths(self, values):
try:
self._validate_strict_lengths(values)
except DataError as err:
logger.warn(
f"FOR IN ZIP default mode will be changed from SHORTEST to STRICT in "
f"Robot Framework 8.0. Use 'mode=SHORTEST' to keep using the SHORTEST "
f"mode. If the mode is not changed, execution will fail like this in "
f"the future: {err}"
)
[docs]
class ForInEnumerateRunner(ForInRunner):
flavor = "IN ENUMERATE"
_start = 0
def _get_values_for_rounds(self, data):
self._start = self._resolve_start(data.start)
return super()._get_values_for_rounds(data)
def _resolve_start(self, start):
if not start or self._context.dry_run:
return 0
try:
start = self._context.variables.replace_string(start)
try:
return int(start)
except ValueError:
raise DataError(f"Value must be an integer, got '{start}'.")
except DataError as err:
raise DataError(f"Invalid FOR IN ENUMERATE start value: {err}")
def _map_dict_values_to_rounds(self, values, per_round):
if per_round > 3:
raise DataError(
f"Number of FOR IN ENUMERATE loop variables must be 1-3 "
f"when iterating over dictionaries, got {per_round}.",
syntax=True,
)
if per_round == 2:
return ((i, v) for i, v in enumerate(values, start=self._start))
return ((i, *v) for i, v in enumerate(values, start=self._start))
def _map_values_to_rounds(self, values, per_round):
values = super()._map_values_to_rounds(values, max(per_round - 1, 1))
return ((i, *v) for i, v in enumerate(values, start=self._start))
def _raise_wrong_variable_count(self, variables, values):
raise DataError(
f"Number of FOR IN ENUMERATE loop values should be multiple of its "
f"variables (excluding the index). Got {variables} variables but "
f"{values} value{s(values)}."
)
[docs]
class WhileRunner:
def __init__(self, context, run=True, templated=False):
self._context = context
self._run = run
self._templated = templated
[docs]
def run(self, data, result):
ctx = self._context
error = None
run = False
result.start_time = datetime.now()
iter_result = result.body.create_iteration(start_time=datetime.now())
if self._run:
if data.error:
error = DataError(data.error, syntax=True)
elif not ctx.dry_run:
try:
run = self._should_run(data.condition, ctx.variables)
except DataError as err:
error = err
with StatusReporter(data, result, self._context, run):
iter_data = data.get_iteration()
if run:
try:
limit = WhileLimit.create(data, ctx.variables)
except DataError as err:
error = err
run = False
if ctx.dry_run or not run:
self._run_iteration(iter_data, iter_result, run)
if error:
raise error
return
errors = []
while True:
try:
with limit:
self._run_iteration(iter_data, iter_result)
except (BreakLoop, ContinueLoop) as ctrl:
if ctrl.earlier_failures:
errors.extend(ctrl.earlier_failures.get_errors())
if isinstance(ctrl, BreakLoop):
break
except ExecutionPassed as passed:
passed.set_earlier_failures(errors)
raise passed
except LimitExceeded as exceeded:
if exceeded.on_limit_pass:
self._context.info(exceeded.message)
else:
errors.append(exceeded)
break
except ExecutionFailed as failed:
errors.extend(failed.get_errors())
if not failed.can_continue(ctx, self._templated):
break
iter_result = result.body.create_iteration(start_time=datetime.now())
iter_data = data.get_iteration()
if not self._should_run(data.condition, ctx.variables):
break
if errors:
raise ExecutionFailures(errors)
def _run_iteration(self, data, result, run=True):
runner = BodyRunner(self._context, run, self._templated)
with StatusReporter(data, result, self._context, run):
runner.run(data, result)
def _should_run(self, condition, variables):
if not condition:
return True
try:
return evaluate_expression(
condition,
variables.current,
resolve_variables=True,
)
except Exception:
msg = get_error_message()
raise DataError(f"Invalid WHILE loop condition: {msg}")
[docs]
class GroupRunner:
def __init__(self, context, run=True, templated=False):
self._context = context
self._run = run
self._templated = templated
[docs]
def run(self, data, result):
if self._run:
error = self._initialize(data, result)
run = error is None
else:
error = None
run = False
with StatusReporter(data, result, self._context, run=run):
runner = BodyRunner(self._context, run, self._templated)
runner.run(data, result)
if error:
raise error
def _initialize(self, data, result):
if data.error:
return DataError(data.error, syntax=True)
try:
result.name = self._context.variables.replace_string(result.name)
except DataError as err:
return err
return None
[docs]
class IfRunner:
_dry_run_stack = []
def __init__(self, context, run=True, templated=False):
self._context = context
self._run = run
self._templated = templated
[docs]
def run(self, data, result):
with self._dry_run_recursion_detection(data) as recursive_dry_run:
error = None
with StatusReporter(data, result, self._context, self._run):
for branch in data.body:
try:
if self._run_if_branch(
branch,
result,
recursive_dry_run,
data.error,
):
self._run = False
except ExecutionStatus as err:
error = err
self._run = False
if error:
raise error
@contextmanager
def _dry_run_recursion_detection(self, data):
if not self._context.dry_run:
yield False
else:
data = data.to_dict()
recursive = data in self._dry_run_stack
self._dry_run_stack.append(data)
try:
yield recursive
finally:
self._dry_run_stack.pop()
def _run_if_branch(self, data, result, recursive_dry_run=False, syntax_error=None):
context = self._context
result = result.body.create_branch(
data.type,
data.condition,
start_time=datetime.now(),
)
error = None
if syntax_error:
run_branch = False
error = DataError(syntax_error, syntax=True)
else:
try:
run_branch = self._should_run_branch(data, context, recursive_dry_run)
except DataError as err:
error = err
run_branch = False
with StatusReporter(data, result, context, run_branch):
runner = BodyRunner(context, run_branch, self._templated)
if not recursive_dry_run:
runner.run(data, result)
if error and self._run:
raise error
return run_branch
def _should_run_branch(self, data, context, recursive_dry_run=False):
if context.dry_run:
return not recursive_dry_run
if not self._run:
return False
if data.condition is None:
return True
try:
return evaluate_expression(
data.condition,
context.variables.current,
resolve_variables=True,
)
except Exception:
msg = get_error_message()
raise DataError(f"Invalid {data.type} condition: {msg}")
[docs]
class TryRunner:
def __init__(self, context, run=True, templated=False):
self._context = context
self._run = run
self._templated = templated
[docs]
def run(self, data, result):
run = self._run
with StatusReporter(data, result, self._context, run):
if data.error:
self._run_invalid(data, result)
return
error = self._run_try(data, result, run)
run_excepts_or_else = self._should_run_excepts_or_else(error, run)
if error:
error = self._run_excepts(data, result, error, run=run_excepts_or_else)
self._run_else(data, result, run=False)
else:
self._run_excepts(data, result, error, run=False)
error = self._run_else(data, result, run=run_excepts_or_else)
error = self._run_finally(data, result, run) or error
if error:
raise error
def _run_invalid(self, data, result):
error_reported = False
for branch in data.body:
branch_result = result.body.create_branch(
branch.type,
branch.patterns,
branch.pattern_type,
branch.assign,
)
with StatusReporter(
branch,
branch_result,
self._context,
run=False,
suppress=True,
):
runner = BodyRunner(self._context, run=False, templated=self._templated)
runner.run(branch, branch_result)
if not error_reported:
error_reported = True
raise DataError(data.error, syntax=True)
raise ExecutionFailed(data.error, syntax=True)
def _run_try(self, data, result, run):
result = result.body.create_branch(data.TRY)
return self._run_branch(data.try_branch, result, run)
def _should_run_excepts_or_else(self, error, run):
if not run:
return False
if not error:
return True
return not (error.skip or error.syntax or isinstance(error, ExecutionPassed))
def _run_branch(self, data, result, run=True, error=None):
try:
with StatusReporter(data, result, self._context, run):
if error:
raise error
runner = BodyRunner(self._context, run, self._templated)
runner.run(data, result)
except ExecutionStatus as err:
return err
else:
return None
def _run_excepts(self, data, result, error, run):
for branch in data.except_branches:
try:
run_branch = run and self._should_run_except(branch, error)
except DataError as err:
run_branch = True
pattern_error = err
else:
pattern_error = None
branch_result = result.body.create_branch(
branch.type,
branch.patterns,
branch.pattern_type,
branch.assign,
)
if run_branch:
if branch.assign:
self._context.variables[branch.assign] = str(error)
error = self._run_branch(branch, branch_result, error=pattern_error)
run = False
else:
self._run_branch(branch, branch_result, run=False)
return error
def _should_run_except(self, branch, error):
if not branch.patterns:
return True
matchers = {
"GLOB": lambda m, p: Matcher(p, spaceless=False, caseless=False).match(m),
"REGEXP": lambda m, p: re.fullmatch(p, m) is not None,
"START": lambda m, p: m.startswith(p),
"LITERAL": lambda m, p: m == p,
}
if branch.pattern_type:
pattern_type = self._context.variables.replace_string(branch.pattern_type)
else:
pattern_type = "LITERAL"
matcher = matchers.get(pattern_type.upper())
if not matcher:
raise DataError(
f"Invalid EXCEPT pattern type '{pattern_type}'. "
f"Valid values are {seq2str(matchers)}."
)
for pattern in branch.patterns:
if matcher(error.message, self._context.variables.replace_string(pattern)):
return True
return False
def _run_else(self, data, result, run):
if data.else_branch:
result = result.body.create_branch(data.ELSE)
return self._run_branch(data.else_branch, result, run)
def _run_finally(self, data, result, run):
if data.finally_branch:
result = result.body.create_branch(data.FINALLY)
try:
with StatusReporter(data.finally_branch, result, self._context, run):
runner = BodyRunner(self._context, run, self._templated)
runner.run(data.finally_branch, result)
except ExecutionStatus as err:
return err
else:
return None
[docs]
class WhileLimit:
def __init__(self, on_limit=None, on_limit_message=None):
self.on_limit = on_limit
self.on_limit_message = on_limit_message
[docs]
@classmethod
def create(cls, data, variables):
limit = cls._parse_limit(data.limit, variables)
on_limit = cls._parse_on_limit(data.on_limit, variables)
on_limit_msg = cls._parse_on_limit_message(data.on_limit_message, variables)
if not limit:
return IterationCountLimit(DEFAULT_WHILE_LIMIT, on_limit, on_limit_msg)
if limit.upper() == "NONE":
return NoLimit()
try:
count = cls._parse_limit_as_count(limit)
except ValueError:
seconds = cls._parse_limit_as_timestr(limit)
return DurationLimit(seconds, on_limit, on_limit_msg)
else:
return IterationCountLimit(count, on_limit, on_limit_msg)
@classmethod
def _parse_limit(cls, limit, variables):
if not limit:
return None
try:
return variables.replace_string(limit)
except DataError as err:
raise DataError(f"Invalid WHILE loop limit: {err}")
@classmethod
def _parse_on_limit(cls, on_limit, variables):
if not on_limit:
return None
try:
on_limit = variables.replace_string(on_limit)
if on_limit.upper() in ("PASS", "FAIL"):
return on_limit.upper()
raise DataError(
f"Value '{on_limit}' is not accepted. Valid values are "
f"'PASS' and 'FAIL'."
)
except DataError as err:
raise DataError(f"Invalid WHILE loop 'on_limit': {err}")
@classmethod
def _parse_on_limit_message(cls, on_limit_message, variables):
if not on_limit_message:
return None
try:
return variables.replace_string(on_limit_message)
except DataError as err:
raise DataError(f"Invalid WHILE loop 'on_limit_message': '{err}")
@classmethod
def _parse_limit_as_count(cls, limit):
limit = normalize(limit)
if limit.endswith("times"):
limit = limit[:-5]
elif limit.endswith("x"):
limit = limit[:-1]
count = int(limit)
if count <= 0:
raise DataError(
f"Invalid WHILE loop limit: Iteration count must be a positive "
f"integer, got '{count}'."
)
return count
@classmethod
def _parse_limit_as_timestr(cls, limit):
try:
return timestr_to_secs(limit)
except ValueError as err:
raise DataError(f"Invalid WHILE loop limit: {err.args[0]}")
[docs]
def limit_exceeded(self):
if self.on_limit_message:
message = self.on_limit_message
else:
message = (
f"WHILE loop was aborted because it did not finish within the limit "
f"of {self}. Use the 'limit' argument to increase or remove the limit "
f"if needed."
)
raise LimitExceeded(self.on_limit == "PASS", message)
def __enter__(self):
raise NotImplementedError
def __exit__(self, exc_type, exc_val, exc_tb):
return None
[docs]
class DurationLimit(WhileLimit):
def __init__(self, max_time, on_limit, on_limit_message):
super().__init__(on_limit, on_limit_message)
self.max_time = max_time
self.start_time = None
def __enter__(self):
if not self.start_time:
self.start_time = time.time()
if time.time() - self.start_time > self.max_time:
self.limit_exceeded()
def __str__(self):
return secs_to_timestr(self.max_time)
[docs]
class IterationCountLimit(WhileLimit):
def __init__(self, max_iterations, on_limit, on_limit_message):
super().__init__(on_limit, on_limit_message)
self.max_iterations = max_iterations
self.current_iterations = 0
def __enter__(self):
if self.current_iterations >= self.max_iterations:
self.limit_exceeded()
self.current_iterations += 1
def __str__(self):
return f"{self.max_iterations} iterations"
[docs]
class NoLimit(WhileLimit):
def __enter__(self):
pass
[docs]
class LimitExceeded(ExecutionFailed):
def __init__(self, on_limit_pass, message):
super().__init__(message)
self.on_limit_pass = on_limit_pass