Source code for robot.running.userkeywordrunner

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from itertools import chain
from typing import TYPE_CHECKING

from robot.errors import (DataError, ExecutionFailed, ExecutionPassed, ExecutionStatus,
                          PassExecution, ReturnFromKeyword, UserKeywordExecutionFailed,
                          VariableError)
from robot.result import Keyword as KeywordResult
from robot.utils import DotDict, getshortdoc, prepr, split_tags_from_doc
from robot.variables import is_list_variable, VariableAssignment

from .arguments import ArgumentSpec, DefaultValue
from .bodyrunner import BodyRunner, KeywordRunner
from .model import Keyword as KeywordData
from .statusreporter import StatusReporter
from .timeouts import KeywordTimeout

if TYPE_CHECKING:
    from .resourcemodel import UserKeyword


[docs] class UserKeywordRunner: def __init__(self, keyword: 'UserKeyword', name: 'str|None' = None): self.keyword = keyword self.name = name or keyword.name self.pre_run_messages = ()
[docs] def run(self, data: KeywordData, result: KeywordResult, context, run=True): kw = self.keyword.bind(data) assignment = VariableAssignment(data.assign) self._config_result(result, data, kw, assignment, context.variables) with StatusReporter(data, result, context, run, implementation=kw): if kw.private: context.warn_on_invalid_private_call(kw) with assignment.assigner(context) as assigner: if run: return_value = self._run(data, kw, result, context) assigner.assign(return_value) return return_value
def _config_result(self, result: KeywordResult, data: KeywordData, kw: 'UserKeyword', assignment, variables): doc = variables.replace_string(kw.doc, ignore_errors=True) doc, tags = split_tags_from_doc(doc) tags = variables.replace_list(kw.tags, ignore_errors=True) + tags result.config(name=self.name, owner=kw.owner.name, doc=getshortdoc(doc), args=data.args, assign=tuple(assignment), tags=tags, type=data.type) def _run(self, data: KeywordData, kw: 'UserKeyword', result: KeywordResult, context): if self.pre_run_messages: for message in self.pre_run_messages: context.output.message(message) variables = context.variables positional, named = self._resolve_arguments(kw, data.args, variables) with context.user_keyword(kw): self._set_arguments(kw, positional, named, context) if kw.timeout: timeout = KeywordTimeout(kw.timeout, variables) result.timeout = str(timeout) else: timeout = None with context.timeout(timeout): exception, return_value = self._execute(kw, result, context) if exception and not exception.can_continue(context): raise exception return_value = self._handle_return_value(return_value, variables) if exception: exception.return_value = return_value raise exception return return_value def _resolve_arguments(self, kw: 'UserKeyword', args, variables=None): return kw.resolve_arguments(args, variables) def _set_arguments(self, kw: 'UserKeyword', positional, named, context): variables = context.variables positional, named = kw.args.map(positional, named, replace_defaults=False) self._set_variables(kw.args, positional, named, variables) context.output.trace(lambda: self._trace_log_args_message(kw, variables), write_if_flat=False) def _set_variables(self, spec: ArgumentSpec, positional, named, variables): positional, var_positional = self._separate_positional(spec, positional) named_only, var_named = self._separate_named(spec, named) for name, value in chain(zip(spec.positional, positional), named_only): if isinstance(value, DefaultValue): value = value.resolve(variables) variables[f'${{{name}}}'] = value if spec.var_positional: variables[f'@{{{spec.var_positional}}}'] = var_positional if spec.var_named: variables[f'&{{{spec.var_named}}}'] = DotDict(var_named) def _separate_positional(self, spec: ArgumentSpec, positional): if not spec.var_positional: return positional, [] count = len(spec.positional) return positional[:count], positional[count:] def _separate_named(self, spec: ArgumentSpec, named): named_only = [] var_named = [] for name, value in named: target = named_only if name in spec.named_only else var_named target.append((name, value)) return named_only, var_named def _trace_log_args_message(self, kw: 'UserKeyword', variables): return self._format_trace_log_args_message( self._format_args_for_trace_logging(kw.args), variables ) def _format_args_for_trace_logging(self, spec: ArgumentSpec): args = [f'${{{arg}}}' for arg in spec.positional] if spec.var_positional: args.append(f'@{{{spec.var_positional}}}') if spec.var_named: args.append(f'&{{{spec.var_named}}}') return args def _format_trace_log_args_message(self, args, variables): args = ' | '.join(f'{name}={prepr(variables[name])}' for name in args) return f'Arguments: [ {args} ]' def _execute(self, kw: 'UserKeyword', result: KeywordResult, context): if kw.error: raise DataError(kw.error) if not kw.body: raise DataError('User keyword cannot be empty.') if not kw.name: raise DataError('User keyword name cannot be empty.') if context.dry_run and kw.tags.robot('no-dry-run'): return None, None error = success = return_value = None if kw.setup: error = self._run_setup_or_teardown(kw.setup, result.setup, context) try: BodyRunner(context, run=not error).run(kw, result) except ReturnFromKeyword as exception: return_value = exception.return_value error = exception.earlier_failures except ExecutionPassed as exception: success = exception error = exception.earlier_failures if error: error.continue_on_failure = False except ExecutionFailed as exception: error = exception if kw.teardown: with context.keyword_teardown(error): td_error = self._run_setup_or_teardown(kw.teardown, result.teardown, context) else: td_error = None if error or td_error: error = UserKeywordExecutionFailed(error, td_error) return error or success, return_value def _handle_return_value(self, return_value, variables): if not return_value: return None contains_list_var = any(is_list_variable(item) for item in return_value) try: return_value = variables.replace_list(return_value) except DataError as err: raise VariableError(f'Replacing variables from keyword return ' f'value failed: {err}') if len(return_value) != 1 or contains_list_var: return return_value return return_value[0] def _run_setup_or_teardown(self, data: KeywordData, result: KeywordResult, context): try: name = context.variables.replace_string(data.name) except DataError as err: if context.dry_run: return None return ExecutionFailed(err.message, syntax=True) if name.upper() in ('', 'NONE'): return None try: KeywordRunner(context).run(data, result, name) except PassExecution: return None except ExecutionStatus as err: return err return None
[docs] def dry_run(self, data: KeywordData, result: KeywordResult, context): kw = self.keyword.bind(data) assignment = VariableAssignment(data.assign) self._config_result(result, data, kw, assignment, context.variables) with StatusReporter(data, result, context, implementation=kw): assignment.validate_assignment() self._dry_run(data, kw, result, context)
def _dry_run(self, data: KeywordData, kw: 'UserKeyword', result: KeywordResult, context): if self.pre_run_messages: for message in self.pre_run_messages: context.output.message(message) self._resolve_arguments(kw, data.args) with context.user_keyword(kw): if kw.timeout: timeout = KeywordTimeout(kw.timeout, context.variables) result.timeout = str(timeout) error, _ = self._execute(kw, result, context) if error: raise error
[docs] class EmbeddedArgumentsRunner(UserKeywordRunner): def __init__(self, keyword: 'UserKeyword', name: str): super().__init__(keyword, name) self.embedded_args = keyword.embedded.match(name).groups() def _resolve_arguments(self, kw: 'UserKeyword', args, variables=None): result = super()._resolve_arguments(kw, args, variables) if variables: embedded = [variables.replace_scalar(e) for e in self.embedded_args] self.embedded_args = kw.embedded.map(embedded) return result def _set_arguments(self, kw: 'UserKeyword', positional, named, context): variables = context.variables for name, value in self.embedded_args: variables[f'${{{name}}}'] = value super()._set_arguments(kw, positional, named, context) def _trace_log_args_message(self, kw: 'UserKeyword', variables): args = [f'${{{arg}}}' for arg in kw.embedded.args] args += self._format_args_for_trace_logging(kw.args) return self._format_trace_log_args_message(args, variables) def _config_result(self, result: KeywordResult, data: KeywordData, kw: 'UserKeyword', assignment, variables): super()._config_result(result, data, kw, assignment, variables) result.source_name = kw.name