Source code for robot.conf.settings

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import glob
import os
import random
import string
import sys
import warnings
from datetime import datetime
from pathlib import Path

from robot.errors import DataError, FrameworkError
from robot.output import LOGGER, loggerhelper
from robot.result.keywordremover import KeywordRemover
from robot.result.flattenkeywordmatcher import validate_flatten_keyword
from robot.utils import (abspath, create_destination_directory, escape,
                         get_link_path, html_escape, is_list_like, plural_or_not as s,
                         seq2str, split_args_from_name_or_path)

from .gatherfailed import gather_failed_tests, gather_failed_suites
from .languages import Languages


class _BaseSettings:
    _cli_opts = {'RPA'              : ('rpa', None),
                 'Name'             : ('name', None),
                 'Doc'              : ('doc', None),
                 'Metadata'         : ('metadata', []),
                 'TestNames'        : ('test', []),
                 'TaskNames'        : ('task', []),
                 'SuiteNames'       : ('suite', []),
                 'ParseInclude'     : ('parseinclude', []),
                 'SetTag'           : ('settag', []),
                 'Include'          : ('include', []),
                 'Exclude'          : ('exclude', []),
                 'OutputDir'        : ('outputdir', abspath('.')),
                 'LegacyOutput'     : ('legacyoutput', False),
                 'Log'              : ('log', 'log.html'),
                 'Report'           : ('report', 'report.html'),
                 'XUnit'            : ('xunit', None),
                 'SplitLog'         : ('splitlog', False),
                 'TimestampOutputs' : ('timestampoutputs', False),
                 'LogTitle'         : ('logtitle', None),
                 'ReportTitle'      : ('reporttitle', None),
                 'ReportBackground' : ('reportbackground', ('#9e9', '#f66', '#fed84f')),
                 'SuiteStatLevel'   : ('suitestatlevel', -1),
                 'TagStatInclude'   : ('tagstatinclude', []),
                 'TagStatExclude'   : ('tagstatexclude', []),
                 'TagStatCombine'   : ('tagstatcombine', []),
                 'TagDoc'           : ('tagdoc', []),
                 'TagStatLink'      : ('tagstatlink', []),
                 'RemoveKeywords'   : ('removekeywords', []),
                 'ExpandKeywords'   : ('expandkeywords', []),
                 'FlattenKeywords'  : ('flattenkeywords', []),
                 'PreRebotModifiers': ('prerebotmodifier', []),
                 'StatusRC'         : ('statusrc', True),
                 'ConsoleColors'    : ('consolecolors', 'AUTO'),
                 'PythonPath'       : ('pythonpath', []),
                 'StdOut'           : ('stdout', None),
                 'StdErr'           : ('stderr', None)}
    _output_opts = ['Output', 'Log', 'Report', 'XUnit', 'DebugFile']

    def __init__(self, options=None, **extra_options):
        self.start_time = datetime.now()
        self._opts = {}
        self._cli_opts = self._cli_opts.copy()
        self._cli_opts.update(self._extra_cli_opts)
        self._process_cli_opts(dict(options or {}, **extra_options))

    def _process_cli_opts(self, opts):
        for name, (cli_name, default) in self._cli_opts.items():
            value = opts.pop(cli_name) if cli_name in opts else default
            if isinstance(default, list):
                # Copy mutable values and support list values as scalars.
                value = list(value) if is_list_like(value) else [value]
            self[name] = self._process_value(name, value)
        if opts:
            raise DataError(f'Invalid option{s(opts)} {seq2str(opts)}.')

    def __setitem__(self, name, value):
        if name not in self._cli_opts:
            raise KeyError(f"Non-existing option '{name}'.")
        self._opts[name] = value

    def _process_value(self, name, value):
        if name == 'LogLevel':
            return self._process_log_level(value)
        if value == self._get_default_value(name):
            return value
        if name == 'Doc':
            return self._process_doc(value)
        if name == 'Metadata':
            return [self._process_metadata(v) for v in value]
        if name == 'TagDoc':
            return [self._process_tagdoc(v) for v in value]
        if name in ['Include', 'Exclude']:
            return [self._format_tag_patterns(v) for v in value]
        if name in self._output_opts or name in ['ReRunFailed', 'ReRunFailedSuites']:
            if isinstance(value, Path):
                return str(value)
            return value if value and value.upper() != 'NONE' else None
        if name == 'OutputDir':
            return Path(value).absolute()
        if name in ['SuiteStatLevel', 'ConsoleWidth']:
            return self._convert_to_positive_integer_or_default(name, value)
        if name == 'VariableFiles':
            return [split_args_from_name_or_path(item) for item in value]
        if name == 'ReportBackground':
            return self._process_report_background(value)
        if name == 'TagStatCombine':
            return [self._process_tag_stat_combine(v) for v in value]
        if name == 'TagStatLink':
            return [v for v in [self._process_tag_stat_link(v) for v in value] if v]
        if name == 'Randomize':
            return self._process_randomize_value(value)
        if name == 'MaxErrorLines':
            return self._process_max_error_lines(value)
        if name == 'MaxAssignLength':
            return self._process_max_assign_length(value)
        if name == 'PythonPath':
            return self._process_pythonpath(value)
        if name == 'RemoveKeywords':
            self._validate_remove_keywords(value)
        if name == 'FlattenKeywords':
            self._validate_flatten_keywords(value)
        if name == 'ExpandKeywords':
            self._validate_expandkeywords(value)
        if name == 'Extension':
            return tuple('.' + ext.lower().lstrip('.') for ext in value.split(':'))
        return value

    def _process_doc(self, value):
        if isinstance(value, Path) or (os.path.isfile(value) and value.strip() == value):
            try:
                with open(value) as f:
                    value = f.read()
            except (OSError, IOError) as err:
                self._raise_invalid('Doc', f"Reading documentation from '{value}' "
                                           f"failed: {err}")
        return self._escape_doc(value).strip()

    def _escape_doc(self, value):
        return value

    def _process_log_level(self, level):
        level, visible_level = self._split_log_level(level.upper())
        self._opts['VisibleLogLevel'] = visible_level
        return level

    def _split_log_level(self, level):
        if ':' in level:
            log_level, visible_level = level.split(':', 1)
        else:
            log_level = visible_level = level
        for level in log_level, visible_level:
            if level not in loggerhelper.LEVELS:
                self._raise_invalid('LogLevel', f"Invalid level '{level}'.")
        if not loggerhelper.IsLogged(log_level)(visible_level):
            self._raise_invalid('LogLevel', f"Level in log '{visible_level}' is lower "
                                            f"than execution level '{log_level}'.")
        return log_level, visible_level

    def _process_max_error_lines(self, value):
        if not value or value.upper() == 'NONE':
            return None
        value = self._convert_to_integer('MaxErrorLines', value)
        if value < 10:
            self._raise_invalid('MaxErrorLines',
                                f"Expected integer greater than 10, got {value}.")
        return value

    def _process_max_assign_length(self, value):
        value = self._convert_to_integer('MaxAssignLength', value)
        return max(value, 0)

    def _process_randomize_value(self, original):
        value = original.upper()
        if ':' in value:
            value, seed = value.split(':', 1)
        else:
            seed = random.randint(0, sys.maxsize)
        if value in ('TEST', 'SUITE'):
            value += 'S'
        valid = ('TESTS', 'SUITES', 'ALL', 'NONE')
        if value not in valid:
            valid = seq2str(valid, lastsep=' or ')
            self._raise_invalid('Randomize', f"Expected {valid}, got '{value}'.")
        try:
            seed = int(seed)
        except ValueError:
            self._raise_invalid('Randomize', f"Seed should be integer, got '{seed}'.")
        return value, seed

    def __getitem__(self, name):
        if name not in self._opts:
            raise KeyError(f"Non-existing option '{name}'.")
        if name in self._output_opts:
            return self._get_output_file(name)
        return self._opts[name]

    def _get_output_file(self, option):
        """Returns path of the requested output file and creates needed dirs.

        `option` can be 'Output', 'Log', 'Report', 'XUnit' or 'DebugFile'.
        """
        name = self._opts[option]
        if not name:
            return None
        if option == 'Log' and self._output_disabled():
            self['Log'] = None
            LOGGER.error('Log file cannot be created if output.xml is disabled.')
            return None
        name = self._process_output_name(option, name)
        path = self.output_directory / name
        create_destination_directory(path, f'{option.lower()} file')
        return path

    def _process_output_name(self, option, name):
        base, ext = os.path.splitext(name)
        if self['TimestampOutputs']:
            s = self.start_time
            base = (f'{base}-{s.year}{s.month:02}{s.day:02}-'
                    f'{s.hour:02}{s.minute:02}{s.second:02}')
        ext = self._get_output_extension(ext, option)
        return base + ext

    def _get_output_extension(self, extension, file_type):
        if extension:
            return extension
        if file_type in ['Output', 'XUnit']:
            return '.xml'
        if file_type in ['Log', 'Report']:
            return '.html'
        if file_type == 'DebugFile':
            return '.txt'
        raise FrameworkError(f"Invalid output file type '{file_type}'.")

    def _process_metadata(self, value):
        name, value = self._split_from_colon(value)
        return name, self._process_doc(value)

    def _split_from_colon(self, value):
        if ':' in value:
            return value.split(':', 1)
        return value, ''

    def _process_tagdoc(self, value):
        return self._split_from_colon(value)

    def _process_report_background(self, colors):
        if colors.count(':') not in [1, 2]:
            self._raise_invalid('ReportBackground', f"Expected format 'pass:fail:skip' "
                                                    f"or 'pass:fail', got '{colors}'.")
        colors = colors.split(':')
        if len(colors) == 2:
            return colors[0], colors[1], '#fed84f'
        return tuple(colors)

    def _process_tag_stat_combine(self, pattern):
        if ':' in pattern:
            pattern, title = pattern.rsplit(':', 1)
        else:
            title = ''
        return self._format_tag_patterns(pattern), title

    def _format_tag_patterns(self, pattern):
        for search, replace in [('&', 'AND'), ('AND', ' AND '), ('OR', ' OR '),
                                ('NOT', ' NOT '), ('_', ' ')]:
            if search in pattern:
                pattern = pattern.replace(search, replace)
        while '  ' in pattern:
            pattern = pattern.replace('  ', ' ')
        if pattern.startswith(' NOT'):
            pattern = pattern[1:]
        return pattern

    def _process_tag_stat_link(self, value):
        tokens = value.split(':')
        if len(tokens) >= 3:
            return tokens[0], ':'.join(tokens[1:-1]), tokens[-1]
        self._raise_invalid('TagStatLink',
                            f"Expected format 'tag:link:title', got '{value}'.")

    def _convert_to_positive_integer_or_default(self, name, value):
        value = self._convert_to_integer(name, value)
        return value if value > 0 else self._get_default_value(name)

    def _convert_to_integer(self, name, value):
        try:
            return int(value)
        except ValueError:
            self._raise_invalid(name, f"Expected integer, got '{value}'.")

    def _get_default_value(self, name):
        return self._cli_opts[name][1]

    def _process_pythonpath(self, paths):
        return [os.path.abspath(globbed)
                for path in paths
                for split in self._split_pythonpath(path)
                for globbed in glob.glob(split) or [split]]

    def _split_pythonpath(self, path):
        path = path.replace('/', os.sep)
        if ';' in path:
            yield from path.split(';')
        elif os.sep == '/':
            yield from path.split(':')
        else:
            drive = ''
            for item in path.split(':'):
                if drive:
                    if item.startswith('\\'):
                        yield f'{drive}:{item}'
                        drive = ''
                        continue
                    yield drive
                    drive = ''
                if len(item) == 1 and item in string.ascii_letters:
                    drive = item
                else:
                    yield item
            if drive:
                yield drive

    def _validate_remove_keywords(self, values):
        for value in values:
            try:
                KeywordRemover.from_config(value)
            except DataError as err:
                self._raise_invalid('RemoveKeywords', err)

    def _validate_flatten_keywords(self, values):
        try:
            validate_flatten_keyword(values)
        except DataError as err:
            self._raise_invalid('FlattenKeywords', err)

    def _validate_expandkeywords(self, values):
        for opt in values:
            if not opt.lower().startswith(('name:', 'tag:')):
                self._raise_invalid('ExpandKeywords', f"Expected 'TAG:<pattern>' or "
                                                      f"'NAME:<pattern>', got '{opt}'.")

    def _raise_invalid(self, option, error):
        raise DataError(f"Invalid value for option '--{option.lower()}': {error}")

    def __contains__(self, setting):
        return setting in self._opts

    def __str__(self):
        return '\n'.join(f'{name}: {self._opts[name]}' for name in sorted(self._opts))

    @property
    def output_directory(self) -> Path:
        return Path(self['OutputDir'])

    @property
    def output(self) -> 'Path|None':
        return self['Output']

    @property
    def legacy_output(self) -> bool:
        return self['LegacyOutput']

    @property
    def log(self) -> 'Path|None':
        return self['Log']

    @property
    def report(self) -> 'Path|None':
        return self['Report']

    @property
    def xunit(self) -> 'Path|None':
        return self['XUnit']

    @property
    def log_level(self):
        return self['LogLevel']

    @property
    def split_log(self):
        return self['SplitLog']

    @property
    def suite_names(self):
        return self._filter_empty(self['SuiteNames'])

    def _filter_empty(self, items):
        return [i for i in items if i] or None

    @property
    def test_names(self):
        return self._filter_empty(self['TestNames'] + self['TaskNames'])

    @property
    def include(self):
        return self._filter_empty(self['Include'])

    @property
    def exclude(self):
        return self._filter_empty(self['Exclude'])

    @property
    def parse_include(self):
        return self['ParseInclude']

    @property
    def pythonpath(self):
        return self['PythonPath']

    @property
    def status_rc(self):
        return self['StatusRC']

    @property
    def statistics_config(self):
        return {
            'suite_stat_level': self['SuiteStatLevel'],
            'tag_stat_include': self['TagStatInclude'],
            'tag_stat_exclude': self['TagStatExclude'],
            'tag_stat_combine': self['TagStatCombine'],
            'tag_stat_link': self['TagStatLink'],
            'tag_doc': self['TagDoc'],
        }

    @property
    def remove_keywords(self):
        return self['RemoveKeywords']

    @property
    def flatten_keywords(self):
        return self['FlattenKeywords']

    @property
    def pre_rebot_modifiers(self):
        return self['PreRebotModifiers']

    @property
    def console_colors(self):
        return self['ConsoleColors']

    @property
    def rpa(self):
        return self['RPA']

    @rpa.setter
    def rpa(self, value):
        self['RPA'] = value


[docs] class RobotSettings(_BaseSettings): _extra_cli_opts = {'Extension' : ('extension', ('.robot', '.rbt', '.robot.rst')), 'Output' : ('output', 'output.xml'), 'LogLevel' : ('loglevel', 'INFO'), 'MaxErrorLines' : ('maxerrorlines', 40), 'MaxAssignLength' : ('maxassignlength', 200), 'DryRun' : ('dryrun', False), 'ExitOnFailure' : ('exitonfailure', False), 'ExitOnError' : ('exitonerror', False), 'Skip' : ('skip', []), 'SkipOnFailure' : ('skiponfailure', []), 'SkipTeardownOnExit' : ('skipteardownonexit', False), 'ReRunFailed' : ('rerunfailed', None), 'ReRunFailedSuites' : ('rerunfailedsuites', None), 'Randomize' : ('randomize', 'NONE'), 'RunEmptySuite' : ('runemptysuite', False), 'Variables' : ('variable', []), 'VariableFiles' : ('variablefile', []), 'Parsers' : ('parser', []), 'PreRunModifiers' : ('prerunmodifier', []), 'Listeners' : ('listener', []), 'ConsoleType' : ('console', 'verbose'), 'ConsoleTypeDotted' : ('dotted', False), 'ConsoleTypeQuiet' : ('quiet', False), 'ConsoleWidth' : ('consolewidth', 78), 'ConsoleMarkers' : ('consolemarkers', 'AUTO'), 'DebugFile' : ('debugfile', None), 'Language' : ('language', [])} _languages = None
[docs] def get_rebot_settings(self): settings = RebotSettings() settings.start_time = self.start_time not_copied = {'Include', 'Exclude', 'TestNames', 'SuiteNames', 'ParseInclude', 'Name', 'Doc', 'Metadata', 'SetTag', 'Output', 'LogLevel', 'TimestampOutputs'} for opt in settings._opts: if opt in self and opt not in not_copied: settings._opts[opt] = self[opt] settings._opts['ProcessEmptySuite'] = self['RunEmptySuite'] return settings
def _output_disabled(self): return self.output is None def _escape_doc(self, value): return escape(value) @property def listeners(self): return self['Listeners'] @property def debug_file(self): return self['DebugFile'] @property def languages(self): if self._languages is None: try: self._languages = Languages(self['Language']) except DataError as err: self._raise_invalid('Language', err) return self._languages @property def suite_config(self): return { 'name': self['Name'], 'doc': self['Doc'], 'metadata': dict(self['Metadata']), 'set_tags': self['SetTag'], 'include_tags': self.include, 'exclude_tags': self.exclude, 'include_suites': self.suite_names, 'include_tests': self.test_names, 'empty_suite_ok': self.run_empty_suite, 'randomize_suites': self.randomize_suites, 'randomize_tests': self.randomize_tests, 'randomize_seed': self.randomize_seed, } @property def suite_names(self): return self._names_and_rerun() @property def test_names(self): return self._names_and_rerun(for_test=True) def _names_and_rerun(self, for_test=False): if for_test: names = self['TestNames'] + self['TaskNames'] rerun = gather_failed_tests(self['ReRunFailed'], self['RunEmptySuite']) else: names = self['SuiteNames'] rerun = gather_failed_suites(self['ReRunFailedSuites'], self['RunEmptySuite']) # `rerun` is None if `--rerunfailed(suites)` wasn't used and a list otherwise. # The list is empty all tests passed and running empty suite is allowed. if rerun: return names + rerun return names or rerun @property def randomize_seed(self): return self['Randomize'][1] @property def randomize_suites(self): return self['Randomize'][0] in ('SUITES', 'ALL') @property def randomize_tests(self): return self['Randomize'][0] in ('TESTS', 'ALL') @property def dry_run(self): return self['DryRun'] @property def exit_on_failure(self): return self['ExitOnFailure'] @property def exit_on_error(self): return self['ExitOnError'] @property def skip(self): return self['Skip'] @property def skipped_tags(self): warnings.warn("'RobotSettings.skipped_tags' is deprecated. Use 'skip' instead.") return self.skip @property def skip_on_failure(self): return self['SkipOnFailure'] @property def skip_teardown_on_exit(self): return self['SkipTeardownOnExit'] @property def console_output_config(self): return { 'type': self.console_type, 'width': self.console_width, 'colors': self.console_colors, 'markers': self.console_markers, 'stdout': self['StdOut'], 'stderr': self['StdErr'] } @property def console_type(self): if self['ConsoleTypeQuiet']: return 'quiet' if self['ConsoleTypeDotted']: return 'dotted' return self['ConsoleType'] @property def console_width(self): return self['ConsoleWidth'] @property def console_markers(self): return self['ConsoleMarkers'] @property def max_error_lines(self): return self['MaxErrorLines'] @property def max_assign_length(self): return self['MaxAssignLength'] @property def parsers(self): return self['Parsers'] @property def pre_run_modifiers(self): return self['PreRunModifiers'] @property def run_empty_suite(self): return self['RunEmptySuite'] @property def variables(self): return self['Variables'] @property def variable_files(self): return self['VariableFiles'] @property def extension(self): return self['Extension']
[docs] class RebotSettings(_BaseSettings): _extra_cli_opts = {'Output' : ('output', None), 'LogLevel' : ('loglevel', 'TRACE'), 'ProcessEmptySuite' : ('processemptysuite', False), 'StartTime' : ('starttime', None), 'EndTime' : ('endtime', None), 'Merge' : ('merge', False)} def _output_disabled(self): return False @property def suite_config(self): return { 'name': self['Name'], 'doc': self['Doc'], 'metadata': dict(self['Metadata']), 'set_tags': self['SetTag'], 'include_tags': self.include, 'exclude_tags': self.exclude, 'include_suites': self.suite_names, 'include_tests': self.test_names, 'empty_suite_ok': self.process_empty_suite, 'remove_keywords': self.remove_keywords, 'log_level': self['LogLevel'], 'start_time': self['StartTime'], 'end_time': self['EndTime'] } @property def log_config(self): if not self.log: return {} return { 'rpa': self.rpa, 'title': html_escape(self['LogTitle'] or ''), 'reportURL': self._url_from_path(self.log, self.report), 'splitLogBase': os.path.basename(os.path.splitext(self.log)[0]), 'defaultLevel': self['VisibleLogLevel'] } @property def report_config(self): if not self.report: return {} return { 'rpa': self.rpa, 'title': html_escape(self['ReportTitle'] or ''), 'logURL': self._url_from_path(self.report, self.log), 'background' : self._resolve_background_colors() } def _url_from_path(self, source, destination): if not destination: return None return get_link_path(destination, os.path.dirname(source)) def _resolve_background_colors(self): colors = self['ReportBackground'] return {'pass': colors[0], 'fail': colors[1], 'skip': colors[2]} @property def merge(self): return self['Merge'] @property def console_output_config(self): return { 'colors': self.console_colors, 'stdout': self['StdOut'], 'stderr': self['StdErr'] } @property def process_empty_suite(self): return self['ProcessEmptySuite'] @property def expand_keywords(self): return self['ExpandKeywords']