# Copyright 2008-2015 Nokia Networks
# Copyright 2016- Robot Framework Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections.abc import MutableSequence
from robot.errors import (
DataError, ExecutionStatus, HandlerExecutionFailed, VariableError
)
from robot.utils import (
DotDict, ErrorDetails, format_assign_message, get_error_message, is_dict_like,
is_list_like, prepr, type_name
)
from .search import search_variable
[docs]
class VariableAssignment:
def __init__(self, assignment):
validator = AssignmentValidator()
self.assignment = validator.validate(assignment)
self.errors = tuple(dict.fromkeys(validator.errors)) # remove duplicates
def __iter__(self):
return iter(self.assignment)
def __len__(self):
return len(self.assignment)
[docs]
def validate_assignment(self):
if self.errors:
if len(self.errors) == 1:
error = self.errors[0]
else:
error = "\n- ".join(["Multiple errors:", *self.errors])
raise DataError(error, syntax=True)
[docs]
def assigner(self, context):
self.validate_assignment()
return VariableAssigner(self.assignment, context)
[docs]
class AssignmentValidator:
def __init__(self):
self.seen_list = False
self.seen_dict = False
self.seen_any = False
self.seen_mark = False
self.errors = []
[docs]
def validate(self, assignment):
return [self._validate(var) for var in assignment]
def _validate(self, variable):
variable = self._validate_assign_mark(variable)
self._validate_state(is_list=variable[0] == "@", is_dict=variable[0] == "&")
return variable
def _validate_assign_mark(self, variable):
if self.seen_mark:
self.errors.append(
"Assign mark '=' can be used only with the last variable.",
)
if variable[-1] == "=":
self.seen_mark = True
return variable[:-1].rstrip()
return variable
def _validate_state(self, is_list, is_dict):
if is_list and self.seen_list:
self.errors.append(
"Assignment can contain only one list variable.",
)
if self.seen_dict or is_dict and self.seen_any:
self.errors.append(
"Dictionary variable cannot be assigned with other variables.",
)
self.seen_list += is_list
self.seen_dict += is_dict
self.seen_any = True
[docs]
class VariableAssigner:
_valid_extended_attr = re.compile(r"^[_a-zA-Z]\w*$")
def __init__(self, assignment, context):
self._assignment = assignment
self._context = context
def __enter__(self):
return self
def __exit__(self, etype, error, tb):
if error is None:
return
if not isinstance(error, ExecutionStatus):
error = HandlerExecutionFailed(ErrorDetails(error))
if error.can_continue(self._context):
self.assign(error.return_value)
[docs]
def assign(self, return_value):
context = self._context
context.output.trace(
lambda: f"Return: {prepr(return_value)}", write_if_flat=False
)
resolver = ReturnValueResolver.from_assignment(self._assignment)
for name, items, value in resolver.resolve(return_value):
if items:
value = self._item_assign(name, items, value, context.variables)
elif not self._extended_assign(name, value, context.variables):
value = self._normal_assign(name, value, context.variables)
context.info(format_assign_message(name, value, items))
def _extended_assign(self, name, value, variables):
if "." not in name or name in variables:
return False
base, attr = [token.strip() for token in name[2:-1].rsplit(".", 1)]
try:
var = variables.replace_scalar(f"${{{base}}}")
except VariableError:
return False
if not (
self._variable_supports_extended_assign(var)
and self._is_valid_extended_attribute(attr)
):
return False
try:
setattr(var, attr, self._handle_list_and_dict(value, name[0]))
except Exception:
raise VariableError(f"Setting '{name}' failed: {get_error_message()}")
return True
def _variable_supports_extended_assign(self, var):
return not isinstance(var, (str, int, float))
def _is_valid_extended_attribute(self, attr):
return self._valid_extended_attr.match(attr) is not None
def _parse_sequence_index(self, index):
if isinstance(index, (int, slice)):
return index
if not isinstance(index, str):
raise ValueError
if ":" not in index:
return int(index)
if index.count(":") > 2:
raise ValueError
return slice(*[int(i) if i else None for i in index.split(":")])
def _variable_type_supports_item_assign(self, var):
return hasattr(var, "__setitem__") and callable(var.__setitem__)
def _raise_cannot_set_type(self, value, expected):
value_type = type_name(value)
raise VariableError(f"Expected {expected}-like value, got {value_type}.")
def _handle_list_and_dict(self, value, identifier):
if identifier == "@":
if not is_list_like(value):
self._raise_cannot_set_type(value, "list")
value = list(value)
if identifier == "&":
if not is_dict_like(value):
self._raise_cannot_set_type(value, "dictionary")
value = DotDict(value)
return value
def _item_assign(self, name, items, value, variables):
*nested, item = items
decorated_nested_items = "".join(f"[{item}]" for item in nested)
var = variables.replace_scalar(f"${name[1:]}{decorated_nested_items}")
if not self._variable_type_supports_item_assign(var):
raise VariableError(
f"Variable '{name}{decorated_nested_items}' is {type_name(var)} "
f"and does not support item assignment."
)
selector = variables.replace_scalar(item)
if isinstance(var, MutableSequence):
try:
selector = self._parse_sequence_index(selector)
except ValueError:
pass
try:
var[selector] = self._handle_list_and_dict(value, name[0])
except (IndexError, TypeError, Exception):
raise VariableError(
f"Setting value to {type_name(var)} variable "
f"'{name}{decorated_nested_items}' at index [{item}] failed: "
f"{get_error_message()}"
)
return value
def _normal_assign(self, name, value, variables):
try:
variables[name] = value
except DataError as err:
raise VariableError(f"Setting variable '{name}' failed: {err}")
# Always return the actually assigned value.
return value if name[0] == "$" else variables[name]
[docs]
class ReturnValueResolver:
[docs]
@classmethod
def from_assignment(cls, assignment):
if not assignment:
return NoReturnValueResolver()
if len(assignment) == 1:
return OneReturnValueResolver(assignment[0])
if any(a[0] == "@" for a in assignment):
return ScalarsAndListReturnValueResolver(assignment)
return ScalarsOnlyReturnValueResolver(assignment)
[docs]
def resolve(self, return_value):
raise NotImplementedError
def _split_assignment(self, assignment):
from robot.running import TypeInfo
match = search_variable(assignment, parse_type=True)
info = TypeInfo.from_variable(match) if match.type else None
return match.name, info, match.items
def _convert(self, return_value, type_info):
if not type_info:
return return_value
return type_info.convert(return_value, kind="Return value")
[docs]
class NoReturnValueResolver(ReturnValueResolver):
[docs]
def resolve(self, return_value):
return []
[docs]
class OneReturnValueResolver(ReturnValueResolver):
def __init__(self, assignment):
self._name, self._type, self._items = self._split_assignment(assignment)
[docs]
def resolve(self, return_value):
if return_value is None:
identifier = self._name[0]
return_value = {"$": None, "@": [], "&": {}}[identifier]
return_value = self._convert(return_value, self._type)
return [(self._name, self._items, return_value)]
[docs]
class MultiReturnValueResolver(ReturnValueResolver):
def __init__(self, assignments):
self._names = []
self._types = []
self._items = []
for assign in assignments:
name, type_, items = self._split_assignment(assign)
self._names.append(name)
self._types.append(type_)
self._items.append(items)
self._minimum = len(assignments)
[docs]
def resolve(self, return_value):
return_value = self._convert_to_list(return_value)
self._validate(len(return_value))
return self._resolve(return_value)
def _convert_to_list(self, return_value):
if return_value is None:
return [None] * self._minimum
if isinstance(return_value, str):
self._raise_expected_list(return_value)
try:
return list(return_value)
except TypeError:
self._raise_expected_list(return_value)
def _raise_expected_list(self, ret):
self._raise(f"Expected list-like value, got {type_name(ret)}.")
def _raise(self, error):
raise VariableError(f"Cannot set variables: {error}")
def _validate(self, return_count):
raise NotImplementedError
def _resolve(self, return_value):
raise NotImplementedError
[docs]
class ScalarsOnlyReturnValueResolver(MultiReturnValueResolver):
def _validate(self, return_count):
if return_count != self._minimum:
self._raise(f"Expected {self._minimum} return values, got {return_count}.")
def _resolve(self, return_value):
return_value = [
self._convert(rv, t) for rv, t in zip(return_value, self._types)
]
return list(zip(self._names, self._items, return_value))
[docs]
class ScalarsAndListReturnValueResolver(MultiReturnValueResolver):
def __init__(self, assignments):
super().__init__(assignments)
self._minimum -= 1
def _validate(self, return_count):
if return_count < self._minimum:
self._raise(
f"Expected {self._minimum} or more return values, got {return_count}."
)
def _resolve(self, return_value):
list_index = [a[0] for a in self._names].index("@")
list_len = len(return_value) - len(self._names) + 1
items_before_list = zip(
self._names[:list_index],
self._items[:list_index],
return_value[:list_index],
)
list_items = (
self._names[list_index],
self._items[list_index],
return_value[list_index : list_index + list_len],
)
items_after_list = zip(
self._names[list_index + 1 :],
self._items[list_index + 1 :],
return_value[list_index + list_len :],
)
all_items = [*items_before_list, list_items, *items_after_list]
return [
(name, items, self._convert(value, info))
for (name, items, value), info in zip(all_items, self._types)
]