Source code for robot.running.userkeywordrunner

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from typing import TYPE_CHECKING

from robot.errors import (
    DataError, ExecutionFailed, ExecutionPassed, ExecutionStatus, PassExecution,
    ReturnFromKeyword, UserKeywordExecutionFailed, VariableError
)
from robot.result import Keyword as KeywordResult
from robot.utils import DotDict, getshortdoc, prepr, split_tags_from_doc
from robot.variables import is_list_variable, VariableAssignment

from .arguments import ArgumentSpec, DefaultValue
from .bodyrunner import BodyRunner, KeywordRunner
from .model import Keyword as KeywordData
from .statusreporter import StatusReporter
from .timeouts import KeywordTimeout

if TYPE_CHECKING:
    from .resourcemodel import UserKeyword


[docs] class UserKeywordRunner: def __init__(self, keyword: "UserKeyword", name: "str|None" = None): self.keyword = keyword self.name = name or keyword.name self.pre_run_messages = ()
[docs] def run(self, data: KeywordData, result: KeywordResult, context, run=True): kw = self.keyword.bind(data) assignment = VariableAssignment(data.assign) self._config_result(result, data, kw, assignment, context.variables) with StatusReporter(data, result, context, run, implementation=kw): self._validate(kw) if kw.private: context.warn_on_invalid_private_call(kw) if not run: return None with assignment.assigner(context) as assigner: return_value = self._run(data, kw, result, context) assigner.assign(return_value) return return_value
def _config_result( self, result: KeywordResult, data: KeywordData, kw: "UserKeyword", assignment, variables, ): args = tuple(data.args) if data.named_args: args += tuple(f"{n}={v}" for n, v in data.named_args.items()) doc = variables.replace_string(kw.doc, ignore_errors=True) doc, tags = split_tags_from_doc(doc) tags = variables.replace_list(kw.tags, ignore_errors=True) + tags result.config( name=self.name, owner=kw.owner.name, doc=getshortdoc(doc), args=args, assign=tuple(assignment), tags=tags, type=data.type, ) def _validate(self, kw: "UserKeyword"): if kw.error: raise DataError(kw.error) if not kw.name: raise DataError("User keyword name cannot be empty.") if not kw.body: raise DataError("User keyword cannot be empty.") def _run( self, data: KeywordData, kw: "UserKeyword", result: KeywordResult, context, ): if self.pre_run_messages: for message in self.pre_run_messages: context.output.message(message) variables = context.variables positional, named = self._resolve_arguments(data, kw, variables) with context.user_keyword(kw): self._set_arguments(kw, positional, named, context) if kw.timeout: timeout = KeywordTimeout(kw.timeout, variables) result.timeout = str(timeout) if timeout else None else: timeout = None with context.keyword_timeout(timeout): exception, return_value = self._execute(kw, result, context) if exception and not exception.can_continue(context): if context.in_teardown and exception.keyword_timeout: # Allow execution to continue on teardowns after timeout. # https://github.com/robotframework/robotframework/issues/3398 exception.keyword_timeout = False raise exception return_value = self._handle_return_value(return_value, variables) if exception: exception.return_value = return_value raise exception return return_value def _resolve_arguments(self, data: KeywordData, kw: "UserKeyword", variables=None): return kw.resolve_arguments(data.args, data.named_args, variables) def _set_arguments(self, kw: "UserKeyword", positional, named, context): variables = context.variables positional, named = kw.args.map(positional, named, replace_defaults=False) self._set_variables(kw.args, positional, named, variables) context.output.trace( lambda: self._trace_log_args_message(kw, variables), write_if_flat=False ) def _set_variables(self, spec: ArgumentSpec, positional, named, variables): positional, var_positional = self._separate_positional(spec, positional) named_only, var_named = self._separate_named(spec, named) for name, value in (*zip(spec.positional, positional), *named_only): if isinstance(value, DefaultValue): value = value.resolve(variables) info = spec.types.get(name) if info: value = info.convert(value, name, kind="Default value for argument") variables[f"${{{name}}}"] = value if spec.var_positional: variables[f"@{{{spec.var_positional}}}"] = var_positional if spec.var_named: variables[f"&{{{spec.var_named}}}"] = DotDict(var_named) def _separate_positional(self, spec: ArgumentSpec, positional): if not spec.var_positional: return positional, [] count = len(spec.positional) return positional[:count], positional[count:] def _separate_named(self, spec: ArgumentSpec, named): named_only = [] var_named = [] for name, value in named: target = named_only if name in spec.named_only else var_named target.append((name, value)) return named_only, var_named def _trace_log_args_message(self, kw: "UserKeyword", variables): return self._format_trace_log_args_message( self._format_args_for_trace_logging(kw.args), variables ) def _format_args_for_trace_logging(self, spec: ArgumentSpec): args = [f"${{{arg}}}" for arg in spec.positional] if spec.var_positional: args.append(f"@{{{spec.var_positional}}}") if spec.named_only: args.extend(f"${{{arg}}}" for arg in spec.named_only) if spec.var_named: args.append(f"&{{{spec.var_named}}}") return args def _format_trace_log_args_message(self, args, variables): args = " | ".join(f"{name}={prepr(variables[name])}" for name in args) return f"Arguments: [ {args} ]" def _execute(self, kw: "UserKeyword", result: KeywordResult, context): if context.dry_run and kw.tags.robot("no-dry-run"): return None, None error = success = return_value = None if kw.setup: error = self._run_setup_or_teardown(kw.setup, result.setup, context) try: BodyRunner(context, run=not error).run(kw, result) except ReturnFromKeyword as exception: return_value = exception.return_value error = exception.earlier_failures except ExecutionPassed as exception: success = exception error = exception.earlier_failures if error: error.continue_on_failure = False except ExecutionFailed as exception: error = exception if kw.teardown: with context.keyword_teardown(error): td_error = self._run_setup_or_teardown( kw.teardown, result.teardown, context ) else: td_error = None if error or td_error: error = UserKeywordExecutionFailed(error, td_error) return error or success, return_value def _handle_return_value(self, return_value, variables): if not return_value: return None contains_list_var = any(is_list_variable(item) for item in return_value) try: return_value = variables.replace_list(return_value) except DataError as err: raise VariableError( f"Replacing variables from keyword return value failed: {err}" ) if len(return_value) != 1 or contains_list_var: return return_value return return_value[0] def _run_setup_or_teardown(self, data: KeywordData, result: KeywordResult, context): try: KeywordRunner(context).run(data, result, setup_or_teardown=True) except PassExecution: return None except ExecutionStatus as err: return err return None
[docs] def dry_run(self, data: KeywordData, result: KeywordResult, context): kw = self.keyword.bind(data) assignment = VariableAssignment(data.assign) self._config_result(result, data, kw, assignment, context.variables) with StatusReporter(data, result, context, implementation=kw): self._validate(kw) assignment.validate_assignment() self._dry_run(data, kw, result, context)
def _dry_run( self, data: KeywordData, kw: "UserKeyword", result: KeywordResult, context, ): if self.pre_run_messages: for message in self.pre_run_messages: context.output.message(message) self._resolve_arguments(data, kw) with context.user_keyword(kw): if kw.timeout: timeout = KeywordTimeout(kw.timeout, context.variables) result.timeout = str(timeout) error, _ = self._execute(kw, result, context) if error: raise error
[docs] class EmbeddedArgumentsRunner(UserKeywordRunner): def __init__(self, keyword: "UserKeyword", name: str): super().__init__(keyword, name) self.embedded_args = keyword.embedded.parse_args(name) def _resolve_arguments(self, data: KeywordData, kw: "UserKeyword", variables=None): result = super()._resolve_arguments(data, kw, variables) if variables: embedded = [variables.replace_scalar(e) for e in self.embedded_args] self.embedded_args = kw.embedded.map(embedded) return result def _set_arguments(self, kw: "UserKeyword", positional, named, context): variables = context.variables for name, value in self.embedded_args: variables[f"${{{name}}}"] = value super()._set_arguments(kw, positional, named, context) def _trace_log_args_message(self, kw: "UserKeyword", variables): args = [f"${{{arg}}}" for arg in kw.embedded.args] args += self._format_args_for_trace_logging(kw.args) return self._format_trace_log_args_message(args, variables) def _config_result( self, result: KeywordResult, data: KeywordData, kw: "UserKeyword", assignment, variables, ): super()._config_result(result, data, kw, assignment, variables) result.source_name = kw.name