Source code for robot.parsing.populators

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import os

from robot.errors import DataError
from robot.model import SuiteNamePatterns
from robot.output import LOGGER
from robot.utils import get_error_message, unic

from .datarow import DataRow
from .tablepopulators import (SettingTablePopulator, VariableTablePopulator,
                              TestTablePopulator, KeywordTablePopulator,
                              NullPopulator)
from .htmlreader import HtmlReader
from .tsvreader import TsvReader
from .robotreader import RobotReader
from .restreader import RestReader


READERS = {'html': HtmlReader, 'htm': HtmlReader, 'xhtml': HtmlReader,
           'tsv': TsvReader , 'rst': RestReader, 'rest': RestReader,
           'txt': RobotReader, 'robot': RobotReader}

# Hook for external tools for altering ${CURDIR} processing
PROCESS_CURDIR = True


[docs]class NoTestsFound(DataError): pass
[docs]class FromFilePopulator(object): _populators = {'setting': SettingTablePopulator, 'variable': VariableTablePopulator, 'test case': TestTablePopulator, 'keyword': KeywordTablePopulator} def __init__(self, datafile): self._datafile = datafile self._populator = NullPopulator() self._curdir = self._get_curdir(datafile.directory) def _get_curdir(self, path): return path.replace('\\','\\\\') if path else None
[docs] def populate(self, path, resource=False): LOGGER.info("Parsing file '%s'." % path) source = self._open(path) try: self._get_reader(path, resource).read(source, self) except: raise DataError(get_error_message()) finally: source.close()
def _open(self, path): if not os.path.isfile(path): raise DataError("File or directory to execute does not exist.") try: # IronPython handles BOM incorrectly if not using binary mode: # https://ironpython.codeplex.com/workitem/34655 return open(path, 'rb') except: raise DataError(get_error_message()) def _get_reader(self, path, resource=False): file_format = os.path.splitext(path.lower())[-1][1:] if resource and file_format == 'resource': file_format = 'robot' try: return READERS[file_format]() except KeyError: raise DataError("Unsupported file format '%s'." % file_format)
[docs] def start_table(self, header): self._populator.populate() table = self._datafile.start_table(DataRow(header).all) self._populator = self._populators[table.type](table) \ if table is not None else NullPopulator() return bool(self._populator)
[docs] def eof(self): self._populator.populate() self._populator = NullPopulator() return bool(self._datafile)
[docs] def add(self, row): if PROCESS_CURDIR and self._curdir: row = self._replace_curdirs_in(row) data = DataRow(row, self._datafile.source) if data: self._populator.add(data)
def _replace_curdirs_in(self, row): old, new = '${CURDIR}', self._curdir return [cell if old not in cell else cell.replace(old, new) for cell in row]
[docs]class FromDirectoryPopulator(object): ignored_prefixes = ('_', '.') ignored_dirs = ('CVS',)
[docs] def populate(self, path, datadir, include_suites=None, include_extensions=None, recurse=True): LOGGER.info("Parsing directory '%s'." % path) include_suites = self._get_include_suites(path, include_suites) init_file, children = self._get_children(path, include_extensions, include_suites) if init_file: self._populate_init_file(datadir, init_file) if recurse: self._populate_children(datadir, children, include_extensions, include_suites)
def _populate_init_file(self, datadir, init_file): datadir.initfile = init_file try: FromFilePopulator(datadir).populate(init_file) except DataError as err: LOGGER.error(err.message) def _populate_children(self, datadir, children, include_extensions, include_suites): for child in children: try: datadir.add_child(child, include_suites, include_extensions) except NoTestsFound: LOGGER.info("Data source '%s' has no tests or tasks." % child) except DataError as err: LOGGER.error("Parsing '%s' failed: %s" % (child, err.message)) def _get_include_suites(self, path, incl_suites): if not incl_suites: return None if not isinstance(incl_suites, SuiteNamePatterns): incl_suites = SuiteNamePatterns( self._create_included_suites(incl_suites)) # If a directory is included, also all its children should be included. if self._is_in_included_suites(os.path.basename(path), incl_suites): return None return incl_suites def _create_included_suites(self, incl_suites): for suite in incl_suites: yield suite while '.' in suite: suite = suite.split('.', 1)[1] yield suite def _get_children(self, dirpath, incl_extensions, incl_suites): init_file = None children = [] for path, is_init_file in self._list_dir(dirpath, incl_extensions, incl_suites): if is_init_file: if not init_file: init_file = path else: LOGGER.error("Ignoring second test suite init file '%s'." % path) else: children.append(path) return init_file, children def _list_dir(self, dir_path, incl_extensions, incl_suites): # os.listdir returns Unicode entries when path is Unicode dir_path = unic(dir_path) names = os.listdir(dir_path) for name in sorted(names, key=lambda item: item.lower()): name = unic(name) # needed to handle nfc/nfd normalization on OSX path = os.path.join(dir_path, name) base, ext = os.path.splitext(name) ext = ext[1:].lower() if self._is_init_file(path, base, ext, incl_extensions): yield path, True elif self._is_included(path, base, ext, incl_extensions, incl_suites): yield path, False else: LOGGER.info("Ignoring file or directory '%s'." % path) def _is_init_file(self, path, base, ext, incl_extensions): return (base.lower() == '__init__' and self._extension_is_accepted(ext, incl_extensions) and os.path.isfile(path)) def _extension_is_accepted(self, ext, incl_extensions): if incl_extensions: return ext in incl_extensions return ext in READERS def _is_included(self, path, base, ext, incl_extensions, incl_suites): if base.startswith(self.ignored_prefixes): return False if os.path.isdir(path): return base not in self.ignored_dirs or ext if not self._extension_is_accepted(ext, incl_extensions): return False return self._is_in_included_suites(base, incl_suites) def _is_in_included_suites(self, name, incl_suites): if not incl_suites: return True return incl_suites.match(self._split_prefix(name)) def _split_prefix(self, name): return name.split('__', 1)[-1]