# Copyright 2008-2015 Nokia Networks
# Copyright 2016- Robot Framework Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Callable, Sequence, TYPE_CHECKING
from robot.errors import DataError
from robot.utils import DotDict, safe_str, Secret, split_from_equals, type_name, unescape
from .resolvable import Resolvable
from .search import is_dict_variable, is_list_variable, search_variable
if TYPE_CHECKING:
from robot.running import Var, Variable
from .store import VariableStore
[docs]
class VariableTableSetter:
def __init__(self, store: "VariableStore"):
self.store = store
[docs]
def set(self, variables: "Sequence[Variable]", overwrite: bool = False):
for var in variables:
try:
resolver = VariableResolver.from_variable(var)
self.store.add(resolver.name, resolver, overwrite)
except DataError as err:
var.report_error(str(err))
[docs]
class VariableResolver(Resolvable):
def __init__(
self,
value: Sequence[str],
name: "str|None" = None,
type: "str|None" = None,
error_reporter: "Callable[[str], None]|None" = None,
):
self.value = tuple(value)
self.name = name
self.type = type
self.error_reporter = error_reporter
self.resolving = False
self.resolved = False
[docs]
@classmethod
def from_name_and_value(
cls,
name: str,
value: "str|Sequence[str]",
separator: "str|None" = None,
error_reporter: "Callable[[str], None]|None" = None,
) -> "VariableResolver":
match = search_variable(name, parse_type=True)
if not match.is_assign(allow_nested=True):
raise DataError(f"Invalid variable name '{name}'.")
if match.identifier == "$":
return ScalarVariableResolver(
value,
separator,
match.name,
match.type,
error_reporter,
)
if separator is not None:
raise DataError("Only scalar variables support separators.")
klass = {"@": ListVariableResolver, "&": DictVariableResolver}[match.identifier]
return klass(value, match.name, match.type, error_reporter)
[docs]
@classmethod
def from_variable(cls, var: "Var|Variable") -> "VariableResolver":
if var.error:
raise DataError(var.error)
return cls.from_name_and_value(
var.name,
var.value,
var.separator,
getattr(var, "report_error", None),
)
[docs]
def resolve(self, variables) -> Any:
if self.resolving:
raise DataError("Recursive variable definition.")
if not self.resolved:
self.resolving = True
try:
value = self._replace_variables(variables)
finally:
self.resolving = False
self.value = self._convert(value, self.type) if self.type else value
if self.name:
base = variables.replace_string(self.name[2:-1])
self.name = self.name[:2] + base + "}"
self.resolved = True
return self.value
def _replace_variables(self, variables) -> Any:
raise NotImplementedError
def _handle_secrets(self, value, replace_scalar):
match = search_variable(value, identifiers="$%")
if match.is_variable():
value = replace_scalar(match.match)
return Secret(value) if match.identifier == "%" else value
return self._handle_embedded_secrets(match, replace_scalar)
def _handle_embedded_secrets(self, match, replace_scalar):
parts = []
secret_seen = False
while match:
value = replace_scalar(match.match)
if match.identifier == "%":
secret_seen = True
elif isinstance(value, Secret):
value = value.value
secret_seen = True
parts.extend([unescape(match.before), value])
match = search_variable(match.after, identifiers="$%")
parts.append(unescape(match.string))
value = "".join(safe_str(p) for p in parts)
return Secret(value) if secret_seen else value
def _convert(self, value, type_):
from robot.running import TypeInfo
info = TypeInfo.from_type_hint(type_)
try:
return info.convert(value, kind="Value")
except (ValueError, TypeError) as err:
raise DataError(str(err))
[docs]
def report_error(self, error):
if self.error_reporter:
self.error_reporter(error)
else:
raise DataError(f"Error reporter not set. Reported error was: {error}")
def _is_secret_type(self, typ=None) -> bool:
typ = typ or self.type
return bool(typ and typ.title() == "Secret")
[docs]
class ScalarVariableResolver(VariableResolver):
def __init__(
self,
value: "str|Sequence[str]",
separator: "str|None" = None,
name=None,
type=None,
error_reporter=None,
):
value, separator = self._get_value_and_separator(value, separator)
super().__init__(value, name, type, error_reporter)
self.separator = separator
def _get_value_and_separator(self, value, separator):
if isinstance(value, str):
value = [value]
elif separator is None and value and value[0].startswith("SEPARATOR="):
separator = value[0][10:]
value = value[1:]
return value, separator
def _replace_variables(self, variables):
value, separator = self.value, self.separator
if self._is_single_value(value, separator):
if self._is_secret_type():
return self._handle_secrets(value[0], variables.replace_scalar)
return variables.replace_scalar(value[0])
if separator is None:
separator = " "
else:
separator = variables.replace_string(separator)
value = variables.replace_list(value)
return separator.join(str(item) for item in value)
def _is_single_value(self, value, separator):
return separator is None and len(value) == 1 and not is_list_variable(value[0])
[docs]
class ListVariableResolver(VariableResolver):
def _replace_variables(self, variables):
if not self._is_secret_type():
return variables.replace_list(self.value)
secrets = []
for value in self.value:
if is_list_variable(value):
secrets.extend(variables.replace_scalar(value))
else:
secrets.append(self._handle_secrets(value, variables.replace_scalar))
return secrets
def _convert(self, value, type_):
return super()._convert(value, f"list[{type_}]")
[docs]
class DictVariableResolver(VariableResolver):
def __init__(self, value: Sequence[str], name=None, type=None, error_reporter=None):
super().__init__(tuple(self._yield_items(value)), name, type, error_reporter)
def _yield_items(self, values):
for item in values:
if is_dict_variable(item):
yield item
else:
name, value = split_from_equals(item)
if value is None:
raise DataError(
f"Invalid dictionary variable item '{item}'. Items must use "
f"'name=value' syntax or be dictionary variables themselves."
)
yield name, value
def _replace_variables(self, variables):
try:
return DotDict(self._yield_replaced(self.value, variables.replace_scalar))
except TypeError as err:
raise DataError(f"Creating dictionary variable failed: {err}")
def _yield_replaced(self, values, replace_scalar):
if not self.type:
secret_key = secret_value = False
elif "=" not in self.type:
secret_key = False
secret_value = self._is_secret_type(self.type)
else:
kt, vt = self.type.split("=", 1)
secret_key = self._is_secret_type(kt)
secret_value = self._is_secret_type(vt)
for item in values:
if isinstance(item, tuple):
key, value = item
if secret_key:
key = self._handle_secrets(key, replace_scalar)
else:
key = replace_scalar(key)
if secret_value:
value = self._handle_secrets(value, replace_scalar)
else:
value = replace_scalar(value)
yield key, value
else:
yield from replace_scalar(item).items()
def _convert(self, value, type_):
k_type, v_type = type_.split("=", 1) if "=" in type_ else ("Any", type_)
return super()._convert(value, f"dict[{k_type}, {v_type}]")