# Copyright 2008-2015 Nokia Networks
# Copyright 2016- Robot Framework Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from robot.errors import DataError
from robot.output import LOGGER
from robot.parsing import File, ModelVisitor, Token
from robot.utils import NormalizedDict
from robot.variables import VariableMatches
from ..model import For, If, IfBranch, TestSuite, TestCase, Try, TryBranch, While
from ..resourcemodel import ResourceFile, UserKeyword
from .settings import FileSettings
[docs]
class SettingsBuilder(ModelVisitor):
def __init__(self, suite: TestSuite, settings: FileSettings):
self.suite = suite
self.settings = settings
[docs]
def visit_Documentation(self, node):
self.suite.doc = node.value
[docs]
def visit_SuiteName(self, node):
self.suite.name = node.value
[docs]
def visit_SuiteSetup(self, node):
self.suite.setup.config(name=node.name, args=node.args,
lineno=node.lineno)
[docs]
def visit_SuiteTeardown(self, node):
self.suite.teardown.config(name=node.name, args=node.args,
lineno=node.lineno)
[docs]
def visit_TestSetup(self, node):
self.settings.test_setup = {'name': node.name, 'args': node.args,
'lineno': node.lineno}
[docs]
def visit_TestTeardown(self, node):
self.settings.test_teardown = {'name': node.name, 'args': node.args,
'lineno': node.lineno}
[docs]
def visit_TestTimeout(self, node):
self.settings.test_timeout = node.value
[docs]
def visit_TestTemplate(self, node):
self.settings.test_template = node.value
[docs]
def visit_LibraryImport(self, node):
self.suite.resource.imports.library(node.name, node.args, node.alias, node.lineno)
[docs]
def visit_ResourceImport(self, node):
self.suite.resource.imports.resource(node.name, node.lineno)
[docs]
def visit_VariablesImport(self, node):
self.suite.resource.imports.variables(node.name, node.args, node.lineno)
[docs]
def visit_VariableSection(self, node):
pass
[docs]
def visit_TestCaseSection(self, node):
pass
[docs]
def visit_KeywordSection(self, node):
pass
[docs]
class SuiteBuilder(ModelVisitor):
def __init__(self, suite: TestSuite, settings: FileSettings):
self.suite = suite
self.settings = settings
self.seen_keywords = NormalizedDict(ignore='_')
self.rpa = None
[docs]
def build(self, model: File):
ErrorReporter(model.source).visit(model)
SettingsBuilder(self.suite, self.settings).visit(model)
self.visit(model)
if self.rpa is not None:
self.suite.rpa = self.rpa
[docs]
def visit_SettingSection(self, node):
pass
[docs]
def visit_Variable(self, node):
self.suite.resource.variables.create(name=node.name,
value=node.value,
separator=node.separator,
lineno=node.lineno,
error=format_error(node.errors))
[docs]
def visit_TestCaseSection(self, node):
if self.rpa is None:
self.rpa = node.tasks
elif self.rpa != node.tasks:
raise DataError('One file cannot have both tests and tasks.')
self.generic_visit(node)
[docs]
def visit_TestCase(self, node):
TestCaseBuilder(self.suite, self.settings).build(node)
[docs]
def visit_Keyword(self, node):
KeywordBuilder(self.suite.resource, self.settings, self.seen_keywords).build(node)
[docs]
class ResourceBuilder(ModelVisitor):
def __init__(self, resource: ResourceFile):
self.resource = resource
self.settings = FileSettings()
self.seen_keywords = NormalizedDict(ignore='_')
[docs]
def build(self, model: File):
ErrorReporter(model.source, raise_on_invalid_header=True).visit(model)
self.visit(model)
[docs]
def visit_Documentation(self, node):
self.resource.doc = node.value
[docs]
def visit_LibraryImport(self, node):
self.resource.imports.library(node.name, node.args, node.alias, node.lineno)
[docs]
def visit_ResourceImport(self, node):
self.resource.imports.resource(node.name, node.lineno)
[docs]
def visit_VariablesImport(self, node):
self.resource.imports.variables(node.name, node.args, node.lineno)
[docs]
def visit_Variable(self, node):
self.resource.variables.create(name=node.name,
value=node.value,
separator=node.separator,
lineno=node.lineno,
error=format_error(node.errors))
[docs]
def visit_Keyword(self, node):
KeywordBuilder(self.resource, self.settings, self.seen_keywords).build(node)
[docs]
class BodyBuilder(ModelVisitor):
def __init__(self, model: 'TestCase|UserKeyword|For|If|Try|While|None' = None):
self.model = model
[docs]
def visit_For(self, node):
ForBuilder(self.model).build(node)
[docs]
def visit_While(self, node):
WhileBuilder(self.model).build(node)
[docs]
def visit_If(self, node):
IfBuilder(self.model).build(node)
[docs]
def visit_Try(self, node):
TryBuilder(self.model).build(node)
[docs]
def visit_KeywordCall(self, node):
self.model.body.create_keyword(name=node.keyword, args=node.args,
assign=node.assign, lineno=node.lineno)
[docs]
def visit_TemplateArguments(self, node):
self.model.body.create_keyword(args=node.args, lineno=node.lineno)
[docs]
def visit_Var(self, node):
self.model.body.create_var(node.name, node.value, node.scope, node.separator,
lineno=node.lineno, error=format_error(node.errors))
[docs]
def visit_Return(self, node):
self.model.body.create_return(node.values, lineno=node.lineno,
error=format_error(node.errors))
[docs]
def visit_Continue(self, node):
self.model.body.create_continue(lineno=node.lineno,
error=format_error(node.errors))
[docs]
def visit_Break(self, node):
self.model.body.create_break(lineno=node.lineno,
error=format_error(node.errors))
[docs]
def visit_Error(self, node):
self.model.body.create_error(node.values, lineno=node.lineno,
error=format_error(node.errors))
[docs]
class TestCaseBuilder(BodyBuilder):
model: TestCase
def __init__(self, suite: TestSuite, settings: FileSettings):
super().__init__(suite.tests.create())
self.settings = settings
self._test_has_tags = False
[docs]
def build(self, node):
settings = self.settings
# Possible parsing errors aren't reported further with tests because:
# - We only validate that test body or name isn't empty.
# - That is validated again during execution.
# - This way e.g. model modifiers can add content to body.
self.model.config(name=node.name, tags=settings.test_tags,
timeout=settings.test_timeout,
template=settings.test_template,
lineno=node.lineno)
if settings.test_setup:
self.model.setup.config(**settings.test_setup)
if settings.test_teardown:
self.model.teardown.config(**settings.test_teardown)
self.generic_visit(node)
if not self._test_has_tags:
self.model.tags.add(settings.default_tags)
if self.model.template:
self._set_template(self.model, self.model.template)
def _set_template(self, parent, template):
for item in parent.body:
if item.type == item.FOR:
self._set_template(item, template)
elif item.type == item.IF_ELSE_ROOT:
for branch in item.body:
self._set_template(branch, template)
elif item.type == item.KEYWORD:
name, args = self._format_template(template, item.args)
item.name = name
item.args = args
def _format_template(self, template, arguments):
matches = VariableMatches(template, identifiers='$')
count = len(matches)
if count == 0 or count != len(arguments):
return template, arguments
temp = []
for match, arg in zip(matches, arguments):
temp[-1:] = [match.before, arg, match.after]
return ''.join(temp), ()
[docs]
def visit_Documentation(self, node):
self.model.doc = node.value
[docs]
def visit_Setup(self, node):
self.model.setup.config(name=node.name, args=node.args, lineno=node.lineno)
[docs]
def visit_Teardown(self, node):
self.model.teardown.config(name=node.name, args=node.args, lineno=node.lineno)
[docs]
def visit_Timeout(self, node):
self.model.timeout = node.value
[docs]
def visit_Template(self, node):
self.model.template = node.value
[docs]
class KeywordBuilder(BodyBuilder):
model: UserKeyword
def __init__(self, resource: ResourceFile, settings: FileSettings,
seen_keywords: NormalizedDict):
super().__init__(resource.keywords.create(tags=settings.keyword_tags))
self.resource = resource
self.seen_keywords = seen_keywords
self.return_setting = None
[docs]
def build(self, node):
kw = self.model
try:
# Validate only name here. Reporting all parsing errors would report also
# body being empty, but we want to validate it only at parsing time.
if not node.name:
raise DataError('User keyword name cannot be empty.')
kw.config(name=node.name, lineno=node.lineno)
except DataError as err:
# Errors other than name being empty mean that name contains invalid
# embedded arguments. Need to set `_name` to bypass `@property`.
kw.config(_name=node.name, lineno=node.lineno, error=str(err))
self._report_error(node, err)
self.generic_visit(node)
if self.return_setting:
kw.body.create_return(self.return_setting)
if not kw.embedded:
self._handle_duplicates(kw, self.seen_keywords, node)
def _report_error(self, node, error):
error = f"Creating keyword '{self.model.name}' failed: {error}"
ErrorReporter(self.model.source).report_error(node, error)
def _handle_duplicates(self, kw, seen, node):
if kw.name in seen:
error = 'Keyword with same name defined multiple times.'
seen[kw.name].error = error
self.resource.keywords.pop()
self._report_error(node, error)
else:
seen[kw.name] = kw
[docs]
def visit_Documentation(self, node):
self.model.doc = node.value
[docs]
def visit_Arguments(self, node):
if node.errors:
error = 'Invalid argument specification: ' + format_error(node.errors)
self.model.error = error
self._report_error(node, error)
else:
self.model.args = node.values
[docs]
def visit_ReturnSetting(self, node):
ErrorReporter(self.model.source).visit(node)
self.return_setting = node.values
[docs]
def visit_Timeout(self, node):
self.model.timeout = node.value
[docs]
def visit_Setup(self, node):
self.model.setup.config(name=node.name, args=node.args, lineno=node.lineno)
[docs]
def visit_Teardown(self, node):
self.model.teardown.config(name=node.name, args=node.args, lineno=node.lineno)
[docs]
def visit_KeywordCall(self, node):
self.model.body.create_keyword(name=node.keyword, args=node.args,
assign=node.assign, lineno=node.lineno)
[docs]
class ForBuilder(BodyBuilder):
model: For
def __init__(self, parent: 'TestCase|UserKeyword|For|If|Try|While'):
super().__init__(parent.body.create_for())
[docs]
def build(self, node):
error = format_error(self._get_errors(node))
self.model.config(assign=node.assign, flavor=node.flavor or 'IN',
values=node.values, start=node.start, mode=node.mode,
fill=node.fill, lineno=node.lineno, error=error)
for step in node.body:
self.visit(step)
return self.model
def _get_errors(self, node):
errors = node.header.errors + node.errors
if node.end:
errors += node.end.errors
return errors
[docs]
class IfBuilder(BodyBuilder):
model: 'IfBranch|None'
def __init__(self, parent: 'TestCase|UserKeyword|For|If|Try|While'):
super().__init__()
self.root = parent.body.create_if()
[docs]
def build(self, node):
self.root.config(lineno=node.lineno, error=format_error(self._get_errors(node)))
assign = node.assign
node_type = None
while node:
node_type = node.type if node.type != 'INLINE IF' else 'IF'
self.model = self.root.body.create_branch(node_type, node.condition,
lineno=node.lineno)
for step in node.body:
self.visit(step)
if assign:
for item in self.model.body:
# Having assign when model item doesn't support assign is an error,
# but it has been handled already when model was validated.
if hasattr(item, 'assign'):
item.assign = assign
node = node.orelse
# Smallish hack to make sure assignment is always run.
if assign and node_type != 'ELSE':
self.root.body.create_branch('ELSE').body.create_keyword(
assign=assign, name='BuiltIn.Set Variable', args=['${NONE}']
)
return self.root
def _get_errors(self, node):
errors = node.header.errors + node.errors
if node.orelse:
errors += self._get_errors(node.orelse)
if node.end:
errors += node.end.errors
return errors
[docs]
class TryBuilder(BodyBuilder):
model: 'TryBranch|None'
def __init__(self, parent: 'TestCase|UserKeyword|For|If|Try|While'):
super().__init__()
self.root = parent.body.create_try()
self.template_error = None
[docs]
def build(self, node):
self.root.config(lineno=node.lineno)
errors = self._get_errors(node)
while node:
self.model = self.root.body.create_branch(node.type, node.patterns,
node.pattern_type, node.assign,
lineno=node.lineno)
for step in node.body:
self.visit(step)
node = node.next
if self.template_error:
errors += (self.template_error,)
if errors:
self.root.error = format_error(errors)
return self.root
def _get_errors(self, node):
errors = node.header.errors + node.errors
if node.next:
errors += self._get_errors(node.next)
if node.end:
errors += node.end.errors
return errors
[docs]
def visit_TemplateArguments(self, node):
self.template_error = 'Templates cannot be used with TRY.'
[docs]
class WhileBuilder(BodyBuilder):
model: While
def __init__(self, parent: 'TestCase|UserKeyword|For|If|Try|While'):
super().__init__(parent.body.create_while())
[docs]
def build(self, node):
error = format_error(self._get_errors(node))
self.model.config(condition=node.condition, limit=node.limit,
on_limit=node.on_limit, on_limit_message=node.on_limit_message,
lineno=node.lineno, error=error)
for step in node.body:
self.visit(step)
return self.model
def _get_errors(self, node):
errors = node.header.errors + node.errors
if node.end:
errors += node.end.errors
return errors
[docs]
class ErrorReporter(ModelVisitor):
def __init__(self, source, raise_on_invalid_header=False):
self.source = source
self.raise_on_invalid_header = raise_on_invalid_header
[docs]
def visit_TestCase(self, node):
pass
[docs]
def visit_Keyword(self, node):
pass
[docs]
def visit_ReturnSetting(self, node):
# Empty 'visit_Keyword' above prevents calling this when visiting the whole
# model, but 'KeywordBuilder.visit_ReturnSetting' visits the node it gets.
self.report_error(node.get_token(Token.RETURN_SETTING), warn=True)
[docs]
def visit_Error(self, node):
for token in node.get_tokens(Token.ERROR):
self.report_error(token)
[docs]
def report_error(self, source, error=None, warn=False, throw=False):
if not error:
if isinstance(source, Token):
error = source.error
else:
error = format_error(source.errors)
message = f"Error in file '{self.source}' on line {source.lineno}: {error}"
if throw:
raise DataError(message)
LOGGER.write(message, level='WARN' if warn else 'ERROR')